1<?php declare(strict_types=1);
2
3/*
4 * This file is part of the Monolog package.
5 *
6 * (c) Jordi Boggiano <j.boggiano@seld.be>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Monolog\Handler;
13
14use Monolog\Logger;
15use Monolog\Formatter\FormatterInterface;
16use Monolog\Formatter\JsonFormatter;
17use PhpAmqpLib\Message\AMQPMessage;
18use PhpAmqpLib\Channel\AMQPChannel;
19use AMQPExchange;
20
21/**
22 * @phpstan-import-type Record from \Monolog\Logger
23 */
24class AmqpHandler extends AbstractProcessingHandler
25{
26    /**
27     * @var AMQPExchange|AMQPChannel $exchange
28     */
29    protected $exchange;
30
31    /**
32     * @var string
33     */
34    protected $exchangeName;
35
36    /**
37     * @param AMQPExchange|AMQPChannel $exchange     AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
38     * @param string|null              $exchangeName Optional exchange name, for AMQPChannel (PhpAmqpLib) only
39     */
40    public function __construct($exchange, ?string $exchangeName = null, $level = Logger::DEBUG, bool $bubble = true)
41    {
42        if ($exchange instanceof AMQPChannel) {
43            $this->exchangeName = (string) $exchangeName;
44        } elseif (!$exchange instanceof AMQPExchange) {
45            throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
46        } elseif ($exchangeName) {
47            @trigger_error('The $exchangeName parameter can only be passed when using PhpAmqpLib, if using an AMQPExchange instance configure it beforehand', E_USER_DEPRECATED);
48        }
49        $this->exchange = $exchange;
50
51        parent::__construct($level, $bubble);
52    }
53
54    /**
55     * {@inheritDoc}
56     */
57    protected function write(array $record): void
58    {
59        $data = $record["formatted"];
60        $routingKey = $this->getRoutingKey($record);
61
62        if ($this->exchange instanceof AMQPExchange) {
63            $this->exchange->publish(
64                $data,
65                $routingKey,
66                0,
67                [
68                    'delivery_mode' => 2,
69                    'content_type' => 'application/json',
70                ]
71            );
72        } else {
73            $this->exchange->basic_publish(
74                $this->createAmqpMessage($data),
75                $this->exchangeName,
76                $routingKey
77            );
78        }
79    }
80
81    /**
82     * {@inheritDoc}
83     */
84    public function handleBatch(array $records): void
85    {
86        if ($this->exchange instanceof AMQPExchange) {
87            parent::handleBatch($records);
88
89            return;
90        }
91
92        foreach ($records as $record) {
93            if (!$this->isHandling($record)) {
94                continue;
95            }
96
97            /** @var Record $record */
98            $record = $this->processRecord($record);
99            $data = $this->getFormatter()->format($record);
100
101            $this->exchange->batch_basic_publish(
102                $this->createAmqpMessage($data),
103                $this->exchangeName,
104                $this->getRoutingKey($record)
105            );
106        }
107
108        $this->exchange->publish_batch();
109    }
110
111    /**
112     * Gets the routing key for the AMQP exchange
113     *
114     * @phpstan-param Record $record
115     */
116    protected function getRoutingKey(array $record): string
117    {
118        $routingKey = sprintf('%s.%s', $record['level_name'], $record['channel']);
119
120        return strtolower($routingKey);
121    }
122
123    private function createAmqpMessage(string $data): AMQPMessage
124    {
125        return new AMQPMessage(
126            $data,
127            [
128                'delivery_mode' => 2,
129                'content_type' => 'application/json',
130            ]
131        );
132    }
133
134    /**
135     * {@inheritDoc}
136     */
137    protected function getDefaultFormatter(): FormatterInterface
138    {
139        return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false);
140    }
141}
142