1<?php declare(strict_types=1);
2
3/*
4 * This file is part of the Monolog package.
5 *
6 * (c) Jordi Boggiano <j.boggiano@seld.be>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Monolog\Handler;
13
14use Throwable;
15use RuntimeException;
16use Monolog\Logger;
17use Monolog\Formatter\FormatterInterface;
18use Monolog\Formatter\ElasticsearchFormatter;
19use InvalidArgumentException;
20use Elasticsearch\Common\Exceptions\RuntimeException as ElasticsearchRuntimeException;
21use Elasticsearch\Client;
22
23/**
24 * Elasticsearch handler
25 *
26 * @link https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/index.html
27 *
28 * Simple usage example:
29 *
30 *    $client = \Elasticsearch\ClientBuilder::create()
31 *        ->setHosts($hosts)
32 *        ->build();
33 *
34 *    $options = array(
35 *        'index' => 'elastic_index_name',
36 *        'type'  => 'elastic_doc_type',
37 *    );
38 *    $handler = new ElasticsearchHandler($client, $options);
39 *    $log = new Logger('application');
40 *    $log->pushHandler($handler);
41 *
42 * @author Avtandil Kikabidze <akalongman@gmail.com>
43 */
44class ElasticsearchHandler extends AbstractProcessingHandler
45{
46    /**
47     * @var Client
48     */
49    protected $client;
50
51    /**
52     * @var mixed[] Handler config options
53     */
54    protected $options = [];
55
56    /**
57     * @param Client  $client  Elasticsearch Client object
58     * @param mixed[] $options Handler configuration
59     */
60    public function __construct(Client $client, array $options = [], $level = Logger::DEBUG, bool $bubble = true)
61    {
62        parent::__construct($level, $bubble);
63        $this->client = $client;
64        $this->options = array_merge(
65            [
66                'index'        => 'monolog', // Elastic index name
67                'type'         => '_doc',    // Elastic document type
68                'ignore_error' => false,     // Suppress Elasticsearch exceptions
69            ],
70            $options
71        );
72    }
73
74    /**
75     * {@inheritDoc}
76     */
77    protected function write(array $record): void
78    {
79        $this->bulkSend([$record['formatted']]);
80    }
81
82    /**
83     * {@inheritDoc}
84     */
85    public function setFormatter(FormatterInterface $formatter): HandlerInterface
86    {
87        if ($formatter instanceof ElasticsearchFormatter) {
88            return parent::setFormatter($formatter);
89        }
90
91        throw new InvalidArgumentException('ElasticsearchHandler is only compatible with ElasticsearchFormatter');
92    }
93
94    /**
95     * Getter options
96     *
97     * @return mixed[]
98     */
99    public function getOptions(): array
100    {
101        return $this->options;
102    }
103
104    /**
105     * {@inheritDoc}
106     */
107    protected function getDefaultFormatter(): FormatterInterface
108    {
109        return new ElasticsearchFormatter($this->options['index'], $this->options['type']);
110    }
111
112    /**
113     * {@inheritDoc}
114     */
115    public function handleBatch(array $records): void
116    {
117        $documents = $this->getFormatter()->formatBatch($records);
118        $this->bulkSend($documents);
119    }
120
121    /**
122     * Use Elasticsearch bulk API to send list of documents
123     *
124     * @param  array[]           $records Records + _index/_type keys
125     * @throws \RuntimeException
126     */
127    protected function bulkSend(array $records): void
128    {
129        try {
130            $params = [
131                'body' => [],
132            ];
133
134            foreach ($records as $record) {
135                $params['body'][] = [
136                    'index' => [
137                        '_index' => $record['_index'],
138                        '_type'  => $record['_type'],
139                    ],
140                ];
141                unset($record['_index'], $record['_type']);
142
143                $params['body'][] = $record;
144            }
145
146            $responses = $this->client->bulk($params);
147
148            if ($responses['errors'] === true) {
149                throw $this->createExceptionFromResponses($responses);
150            }
151        } catch (Throwable $e) {
152            if (! $this->options['ignore_error']) {
153                throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e);
154            }
155        }
156    }
157
158    /**
159     * Creates elasticsearch exception from responses array
160     *
161     * Only the first error is converted into an exception.
162     *
163     * @param mixed[] $responses returned by $this->client->bulk()
164     */
165    protected function createExceptionFromResponses(array $responses): ElasticsearchRuntimeException
166    {
167        foreach ($responses['items'] ?? [] as $item) {
168            if (isset($item['index']['error'])) {
169                return $this->createExceptionFromError($item['index']['error']);
170            }
171        }
172
173        return new ElasticsearchRuntimeException('Elasticsearch failed to index one or more records.');
174    }
175
176    /**
177     * Creates elasticsearch exception from error array
178     *
179     * @param mixed[] $error
180     */
181    protected function createExceptionFromError(array $error): ElasticsearchRuntimeException
182    {
183        $previous = isset($error['caused_by']) ? $this->createExceptionFromError($error['caused_by']) : null;
184
185        return new ElasticsearchRuntimeException($error['type'] . ': ' . $error['reason'], 0, $previous);
186    }
187}
188