1<?php declare(strict_types=1); 2 3/* 4 * This file is part of the Monolog package. 5 * 6 * (c) Jordi Boggiano <j.boggiano@seld.be> 7 * 8 * For the full copyright and license information, please view the LICENSE 9 * file that was distributed with this source code. 10 */ 11 12namespace Monolog\Handler; 13 14use Monolog\Logger; 15use Monolog\Formatter\FormatterInterface; 16use Monolog\Formatter\JsonFormatter; 17use PhpAmqpLib\Message\AMQPMessage; 18use PhpAmqpLib\Channel\AMQPChannel; 19use AMQPExchange; 20 21/** 22 * @phpstan-import-type Record from \Monolog\Logger 23 */ 24class AmqpHandler extends AbstractProcessingHandler 25{ 26 /** 27 * @var AMQPExchange|AMQPChannel $exchange 28 */ 29 protected $exchange; 30 31 /** 32 * @var string 33 */ 34 protected $exchangeName; 35 36 /** 37 * @param AMQPExchange|AMQPChannel $exchange AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use 38 * @param string|null $exchangeName Optional exchange name, for AMQPChannel (PhpAmqpLib) only 39 */ 40 public function __construct($exchange, ?string $exchangeName = null, $level = Logger::DEBUG, bool $bubble = true) 41 { 42 if ($exchange instanceof AMQPChannel) { 43 $this->exchangeName = (string) $exchangeName; 44 } elseif (!$exchange instanceof AMQPExchange) { 45 throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required'); 46 } elseif ($exchangeName) { 47 @trigger_error('The $exchangeName parameter can only be passed when using PhpAmqpLib, if using an AMQPExchange instance configure it beforehand', E_USER_DEPRECATED); 48 } 49 $this->exchange = $exchange; 50 51 parent::__construct($level, $bubble); 52 } 53 54 /** 55 * {@inheritDoc} 56 */ 57 protected function write(array $record): void 58 { 59 $data = $record["formatted"]; 60 $routingKey = $this->getRoutingKey($record); 61 62 if ($this->exchange instanceof AMQPExchange) { 63 $this->exchange->publish( 64 $data, 65 $routingKey, 66 0, 67 [ 68 'delivery_mode' => 2, 69 'content_type' => 'application/json', 70 ] 71 ); 72 } else { 73 $this->exchange->basic_publish( 74 $this->createAmqpMessage($data), 75 $this->exchangeName, 76 $routingKey 77 ); 78 } 79 } 80 81 /** 82 * {@inheritDoc} 83 */ 84 public function handleBatch(array $records): void 85 { 86 if ($this->exchange instanceof AMQPExchange) { 87 parent::handleBatch($records); 88 89 return; 90 } 91 92 foreach ($records as $record) { 93 if (!$this->isHandling($record)) { 94 continue; 95 } 96 97 /** @var Record $record */ 98 $record = $this->processRecord($record); 99 $data = $this->getFormatter()->format($record); 100 101 $this->exchange->batch_basic_publish( 102 $this->createAmqpMessage($data), 103 $this->exchangeName, 104 $this->getRoutingKey($record) 105 ); 106 } 107 108 $this->exchange->publish_batch(); 109 } 110 111 /** 112 * Gets the routing key for the AMQP exchange 113 * 114 * @phpstan-param Record $record 115 */ 116 protected function getRoutingKey(array $record): string 117 { 118 $routingKey = sprintf('%s.%s', $record['level_name'], $record['channel']); 119 120 return strtolower($routingKey); 121 } 122 123 private function createAmqpMessage(string $data): AMQPMessage 124 { 125 return new AMQPMessage( 126 $data, 127 [ 128 'delivery_mode' => 2, 129 'content_type' => 'application/json', 130 ] 131 ); 132 } 133 134 /** 135 * {@inheritDoc} 136 */ 137 protected function getDefaultFormatter(): FormatterInterface 138 { 139 return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false); 140 } 141} 142