1<?php declare(strict_types=1); 2 3/* 4 * This file is part of the Monolog package. 5 * 6 * (c) Jordi Boggiano <j.boggiano@seld.be> 7 * 8 * For the full copyright and license information, please view the LICENSE 9 * file that was distributed with this source code. 10 */ 11 12namespace Monolog\Handler; 13 14use Throwable; 15use RuntimeException; 16use Monolog\Logger; 17use Monolog\Formatter\FormatterInterface; 18use Monolog\Formatter\ElasticsearchFormatter; 19use InvalidArgumentException; 20use Elasticsearch\Common\Exceptions\RuntimeException as ElasticsearchRuntimeException; 21use Elasticsearch\Client; 22 23/** 24 * Elasticsearch handler 25 * 26 * @link https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/index.html 27 * 28 * Simple usage example: 29 * 30 * $client = \Elasticsearch\ClientBuilder::create() 31 * ->setHosts($hosts) 32 * ->build(); 33 * 34 * $options = array( 35 * 'index' => 'elastic_index_name', 36 * 'type' => 'elastic_doc_type', 37 * ); 38 * $handler = new ElasticsearchHandler($client, $options); 39 * $log = new Logger('application'); 40 * $log->pushHandler($handler); 41 * 42 * @author Avtandil Kikabidze <akalongman@gmail.com> 43 */ 44class ElasticsearchHandler extends AbstractProcessingHandler 45{ 46 /** 47 * @var Client 48 */ 49 protected $client; 50 51 /** 52 * @var mixed[] Handler config options 53 */ 54 protected $options = []; 55 56 /** 57 * @param Client $client Elasticsearch Client object 58 * @param mixed[] $options Handler configuration 59 */ 60 public function __construct(Client $client, array $options = [], $level = Logger::DEBUG, bool $bubble = true) 61 { 62 parent::__construct($level, $bubble); 63 $this->client = $client; 64 $this->options = array_merge( 65 [ 66 'index' => 'monolog', // Elastic index name 67 'type' => '_doc', // Elastic document type 68 'ignore_error' => false, // Suppress Elasticsearch exceptions 69 ], 70 $options 71 ); 72 } 73 74 /** 75 * {@inheritDoc} 76 */ 77 protected function write(array $record): void 78 { 79 $this->bulkSend([$record['formatted']]); 80 } 81 82 /** 83 * {@inheritDoc} 84 */ 85 public function setFormatter(FormatterInterface $formatter): HandlerInterface 86 { 87 if ($formatter instanceof ElasticsearchFormatter) { 88 return parent::setFormatter($formatter); 89 } 90 91 throw new InvalidArgumentException('ElasticsearchHandler is only compatible with ElasticsearchFormatter'); 92 } 93 94 /** 95 * Getter options 96 * 97 * @return mixed[] 98 */ 99 public function getOptions(): array 100 { 101 return $this->options; 102 } 103 104 /** 105 * {@inheritDoc} 106 */ 107 protected function getDefaultFormatter(): FormatterInterface 108 { 109 return new ElasticsearchFormatter($this->options['index'], $this->options['type']); 110 } 111 112 /** 113 * {@inheritDoc} 114 */ 115 public function handleBatch(array $records): void 116 { 117 $documents = $this->getFormatter()->formatBatch($records); 118 $this->bulkSend($documents); 119 } 120 121 /** 122 * Use Elasticsearch bulk API to send list of documents 123 * 124 * @param array[] $records Records + _index/_type keys 125 * @throws \RuntimeException 126 */ 127 protected function bulkSend(array $records): void 128 { 129 try { 130 $params = [ 131 'body' => [], 132 ]; 133 134 foreach ($records as $record) { 135 $params['body'][] = [ 136 'index' => [ 137 '_index' => $record['_index'], 138 '_type' => $record['_type'], 139 ], 140 ]; 141 unset($record['_index'], $record['_type']); 142 143 $params['body'][] = $record; 144 } 145 146 $responses = $this->client->bulk($params); 147 148 if ($responses['errors'] === true) { 149 throw $this->createExceptionFromResponses($responses); 150 } 151 } catch (Throwable $e) { 152 if (! $this->options['ignore_error']) { 153 throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e); 154 } 155 } 156 } 157 158 /** 159 * Creates elasticsearch exception from responses array 160 * 161 * Only the first error is converted into an exception. 162 * 163 * @param mixed[] $responses returned by $this->client->bulk() 164 */ 165 protected function createExceptionFromResponses(array $responses): ElasticsearchRuntimeException 166 { 167 foreach ($responses['items'] ?? [] as $item) { 168 if (isset($item['index']['error'])) { 169 return $this->createExceptionFromError($item['index']['error']); 170 } 171 } 172 173 return new ElasticsearchRuntimeException('Elasticsearch failed to index one or more records.'); 174 } 175 176 /** 177 * Creates elasticsearch exception from error array 178 * 179 * @param mixed[] $error 180 */ 181 protected function createExceptionFromError(array $error): ElasticsearchRuntimeException 182 { 183 $previous = isset($error['caused_by']) ? $this->createExceptionFromError($error['caused_by']) : null; 184 185 return new ElasticsearchRuntimeException($error['type'] . ': ' . $error['reason'], 0, $previous); 186 } 187} 188