1<?php
2
3namespace Elastica;
4
5use Elastica\Bulk\Action;
6use Elastica\Bulk\ResponseSet;
7use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
8use Elastica\Exception\ClientException;
9use Elastica\Exception\ConnectionException;
10use Elastica\Exception\InvalidException;
11use Elastica\Exception\ResponseException;
12use Elastica\Script\AbstractScript;
13use Elasticsearch\Endpoints\AbstractEndpoint;
14use Elasticsearch\Endpoints\ClosePointInTime;
15use Elasticsearch\Endpoints\Indices\ForceMerge;
16use Elasticsearch\Endpoints\Indices\Refresh;
17use Elasticsearch\Endpoints\Update;
18use Psr\Log\LoggerInterface;
19use Psr\Log\NullLogger;
20
21/**
22 * Client to connect the the elasticsearch server.
23 *
24 * @author Nicolas Ruflin <spam@ruflin.com>
25 */
26class Client
27{
28    /**
29     * @var ClientConfiguration
30     */
31    protected $_config;
32
33    /**
34     * @var callable
35     */
36    protected $_callback;
37
38    /**
39     * @var Connection\ConnectionPool
40     */
41    protected $_connectionPool;
42
43    /**
44     * @var Request|null
45     */
46    protected $_lastRequest;
47
48    /**
49     * @var Response|null
50     */
51    protected $_lastResponse;
52
53    /**
54     * @var LoggerInterface
55     */
56    protected $_logger;
57
58    /**
59     * @var string
60     */
61    protected $_version;
62
63    /**
64     * Creates a new Elastica client.
65     *
66     * @param array|string  $config   OPTIONAL Additional config or DSN of options
67     * @param callable|null $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down)
68     *
69     * @throws InvalidException
70     */
71    public function __construct($config = [], ?callable $callback = null, ?LoggerInterface $logger = null)
72    {
73        if (\is_string($config)) {
74            $configuration = ClientConfiguration::fromDsn($config);
75        } elseif (\is_array($config)) {
76            $configuration = ClientConfiguration::fromArray($config);
77        } else {
78            throw new InvalidException('Config parameter must be an array or a string.');
79        }
80
81        $this->_config = $configuration;
82        $this->_callback = $callback;
83        $this->_logger = $logger ?? new NullLogger();
84
85        $this->_initConnections();
86    }
87
88    /**
89     * Get current version.
90     *
91     * @throws ClientException
92     * @throws ConnectionException
93     * @throws ResponseException
94     */
95    public function getVersion(): string
96    {
97        if ($this->_version) {
98            return $this->_version;
99        }
100
101        $data = $this->request('/')->getData();
102
103        return $this->_version = $data['version']['number'];
104    }
105
106    /**
107     * Sets specific config values (updates and keeps default values).
108     *
109     * @param array $config Params
110     */
111    public function setConfig(array $config): self
112    {
113        foreach ($config as $key => $value) {
114            $this->_config->set($key, $value);
115        }
116
117        return $this;
118    }
119
120    /**
121     * Returns a specific config key or the whole config array if not set.
122     *
123     * @throws InvalidException if the given key is not found in the configuration
124     *
125     * @return array|bool|string
126     */
127    public function getConfig(string $key = '')
128    {
129        return $this->_config->get($key);
130    }
131
132    /**
133     * Sets / overwrites a specific config value.
134     *
135     * @param mixed $value Value
136     */
137    public function setConfigValue(string $key, $value): self
138    {
139        return $this->setConfig([$key => $value]);
140    }
141
142    /**
143     * @param array|string $keys    config key or path of config keys
144     * @param mixed        $default default value will be returned if key was not found
145     *
146     * @return mixed
147     */
148    public function getConfigValue($keys, $default = null)
149    {
150        $value = $this->_config->getAll();
151        foreach ((array) $keys as $key) {
152            if (isset($value[$key])) {
153                $value = $value[$key];
154            } else {
155                return $default;
156            }
157        }
158
159        return $value;
160    }
161
162    /**
163     * Returns the index for the given connection.
164     */
165    public function getIndex(string $name): Index
166    {
167        return new Index($this, $name);
168    }
169
170    /**
171     * Adds a HTTP Header.
172     */
173    public function addHeader(string $header, string $value): self
174    {
175        if ($this->_config->has('headers')) {
176            $headers = $this->_config->get('headers');
177        } else {
178            $headers = [];
179        }
180        $headers[$header] = $value;
181        $this->_config->set('headers', $headers);
182
183        return $this;
184    }
185
186    /**
187     * Remove a HTTP Header.
188     */
189    public function removeHeader(string $header): self
190    {
191        if ($this->_config->has('headers')) {
192            $headers = $this->_config->get('headers');
193            unset($headers[$header]);
194            $this->_config->set('headers', $headers);
195        }
196
197        return $this;
198    }
199
200    /**
201     * Uses _bulk to send documents to the server.
202     *
203     * Array of \Elastica\Document as input. Index has to be set inside the
204     * document, because for bulk settings documents, documents can belong to
205     * any index
206     *
207     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
208     *
209     * @param array|Document[] $docs Array of Elastica\Document
210     *
211     * @throws InvalidException      If docs is empty
212     * @throws ClientException
213     * @throws ConnectionException
214     * @throws ResponseException
215     * @throws BulkResponseException
216     */
217    public function updateDocuments(array $docs, array $requestParams = []): ResponseSet
218    {
219        if (!$docs) {
220            throw new InvalidException('Array has to consist of at least one element');
221        }
222
223        $bulk = new Bulk($this);
224
225        $bulk->addDocuments($docs, Action::OP_TYPE_UPDATE);
226        foreach ($requestParams as $key => $value) {
227            $bulk->setRequestParam($key, $value);
228        }
229
230        return $bulk->send();
231    }
232
233    /**
234     * Uses _bulk to send documents to the server.
235     *
236     * Array of \Elastica\Document as input. Index has to be set inside the
237     * document, because for bulk settings documents, documents can belong to
238     * any index
239     *
240     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
241     *
242     * @param array|Document[] $docs Array of Elastica\Document
243     *
244     * @throws InvalidException      If docs is empty
245     * @throws ClientException
246     * @throws ConnectionException
247     * @throws ResponseException
248     * @throws BulkResponseException
249     */
250    public function addDocuments(array $docs, array $requestParams = []): ResponseSet
251    {
252        if (!$docs) {
253            throw new InvalidException('Array has to consist of at least one element');
254        }
255
256        $bulk = new Bulk($this);
257
258        $bulk->addDocuments($docs);
259
260        foreach ($requestParams as $key => $value) {
261            $bulk->setRequestParam($key, $value);
262        }
263
264        return $bulk->send();
265    }
266
267    /**
268     * Update document, using update script. Requires elasticsearch >= 0.19.0.
269     *
270     * @param int|string                    $id      document id
271     * @param AbstractScript|array|Document $data    raw data for request body
272     * @param string                        $index   index to update
273     * @param array                         $options array of query params to use for query. For possible options check es api
274     *
275     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
276     *
277     * @throws ClientException
278     * @throws ConnectionException
279     * @throws ResponseException
280     */
281    public function updateDocument($id, $data, $index, array $options = []): Response
282    {
283        $endpoint = new Update();
284        $endpoint->setId($id);
285        $endpoint->setIndex($index);
286
287        if ($data instanceof AbstractScript) {
288            $requestData = $data->toArray();
289        } elseif ($data instanceof Document) {
290            $requestData = ['doc' => $data->getData()];
291
292            if ($data->getDocAsUpsert()) {
293                $requestData['doc_as_upsert'] = true;
294            }
295
296            $docOptions = $data->getOptions(
297                [
298                    'consistency',
299                    'parent',
300                    'percolate',
301                    'refresh',
302                    'replication',
303                    'retry_on_conflict',
304                    'routing',
305                    'timeout',
306                ]
307            );
308            $options += $docOptions;
309        } else {
310            $requestData = $data;
311        }
312
313        // If an upsert document exists
314        if ($data instanceof AbstractScript || $data instanceof Document) {
315            if ($data->hasUpsert()) {
316                $requestData['upsert'] = $data->getUpsert()->getData();
317            }
318        }
319
320        $endpoint->setBody($requestData);
321        $endpoint->setParams($options);
322
323        $response = $this->requestEndpoint($endpoint);
324
325        if ($response->isOk()
326            && $data instanceof Document
327            && ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false))
328        ) {
329            $data->setVersionParams($response->getData());
330        }
331
332        return $response;
333    }
334
335    /**
336     * Bulk deletes documents.
337     *
338     * @param array|Document[] $docs
339     *
340     * @throws InvalidException
341     * @throws ClientException
342     * @throws ConnectionException
343     * @throws ResponseException
344     * @throws BulkResponseException
345     */
346    public function deleteDocuments(array $docs, array $requestParams = []): ResponseSet
347    {
348        if (!$docs) {
349            throw new InvalidException('Array has to consist of at least one element');
350        }
351
352        $bulk = new Bulk($this);
353        $bulk->addDocuments($docs, Action::OP_TYPE_DELETE);
354
355        foreach ($requestParams as $key => $value) {
356            $bulk->setRequestParam($key, $value);
357        }
358
359        return $bulk->send();
360    }
361
362    /**
363     * Returns the status object for all indices.
364     *
365     * @return Status
366     */
367    public function getStatus()
368    {
369        return new Status($this);
370    }
371
372    /**
373     * Returns the current cluster.
374     *
375     * @return Cluster
376     */
377    public function getCluster()
378    {
379        return new Cluster($this);
380    }
381
382    /**
383     * Establishes the client connections.
384     */
385    public function connect()
386    {
387        $this->_initConnections();
388    }
389
390    /**
391     * @return $this
392     */
393    public function addConnection(Connection $connection)
394    {
395        $this->_connectionPool->addConnection($connection);
396
397        return $this;
398    }
399
400    /**
401     * Determines whether a valid connection is available for use.
402     *
403     * @return bool
404     */
405    public function hasConnection()
406    {
407        return $this->_connectionPool->hasConnection();
408    }
409
410    /**
411     * @throws ClientException
412     *
413     * @return Connection
414     */
415    public function getConnection()
416    {
417        return $this->_connectionPool->getConnection();
418    }
419
420    /**
421     * @return Connection[]
422     */
423    public function getConnections()
424    {
425        return $this->_connectionPool->getConnections();
426    }
427
428    /**
429     * @return \Elastica\Connection\Strategy\StrategyInterface
430     */
431    public function getConnectionStrategy()
432    {
433        return $this->_connectionPool->getStrategy();
434    }
435
436    /**
437     * @param array|Connection[] $connections
438     *
439     * @return $this
440     */
441    public function setConnections(array $connections)
442    {
443        $this->_connectionPool->setConnections($connections);
444
445        return $this;
446    }
447
448    /**
449     * Deletes documents with the given ids, index, type from the index.
450     *
451     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
452     *
453     * @param array        $ids     Document ids
454     * @param Index|string $index   Index name
455     * @param bool|string  $routing Optional routing key for all ids
456     *
457     * @throws InvalidException
458     * @throws ClientException
459     * @throws ConnectionException
460     * @throws ResponseException
461     * @throws BulkResponseException
462     */
463    public function deleteIds(array $ids, $index, $routing = false): ResponseSet
464    {
465        if (!$ids) {
466            throw new InvalidException('Array has to consist of at least one id');
467        }
468
469        $bulk = new Bulk($this);
470        $bulk->setIndex($index);
471
472        foreach ($ids as $id) {
473            $action = new Action(Action::OP_TYPE_DELETE);
474            $action->setId($id);
475
476            if (!empty($routing)) {
477                $action->setRouting($routing);
478            }
479
480            $bulk->addAction($action);
481        }
482
483        return $bulk->send();
484    }
485
486    /**
487     * Bulk operation.
488     *
489     * Every entry in the params array has to exactly on array
490     * of the bulk operation. An example param array would be:
491     *
492     * array(
493     *         array('index' => array('_index' => 'test', '_id' => '1')),
494     *         array('field1' => 'value1'),
495     *         array('delete' => array('_index' => 'test', '_id' => '2')),
496     *         array('create' => array('_index' => 'test', '_id' => '3')),
497     *         array('field1' => 'value3'),
498     *         array('update' => array('_id' => '1', '_index' => 'test')),
499     *         array('doc' => array('field2' => 'value2')),
500     * );
501     *
502     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
503     *
504     * @throws ResponseException
505     * @throws InvalidException
506     * @throws ClientException
507     * @throws ConnectionException
508     * @throws BulkResponseException
509     */
510    public function bulk(array $params): ResponseSet
511    {
512        if (!$params) {
513            throw new InvalidException('Array has to consist of at least one param');
514        }
515
516        $bulk = new Bulk($this);
517
518        $bulk->addRawData($params);
519
520        return $bulk->send();
521    }
522
523    /**
524     * Makes calls to the elasticsearch server based on this index.
525     *
526     * It's possible to make any REST query directly over this method
527     *
528     * @param string       $path        Path to call
529     * @param string       $method      Rest method to use (GET, POST, DELETE, PUT)
530     * @param array|string $data        OPTIONAL Arguments as array or pre-encoded string
531     * @param array        $query       OPTIONAL Query params
532     * @param string       $contentType Content-Type sent with this request
533     *
534     * @throws ClientException
535     * @throws ConnectionException
536     * @throws ResponseException
537     */
538    public function request(string $path, string $method = Request::GET, $data = [], array $query = [], string $contentType = Request::DEFAULT_CONTENT_TYPE): Response
539    {
540        $connection = $this->getConnection();
541        $request = $this->_lastRequest = new Request($path, $method, $data, $query, $connection, $contentType);
542        $this->_lastResponse = null;
543
544        try {
545            $response = $this->_lastResponse = $request->send();
546        } catch (ConnectionException $e) {
547            $this->_connectionPool->onFail($connection, $e, $this);
548            $this->_logger->error('Elastica Request Failure', [
549                'exception' => $e,
550                'request' => $e->getRequest()->toArray(),
551                'retry' => $this->hasConnection(),
552            ]);
553
554            // In case there is no valid connection left, throw exception which caused the disabling of the connection.
555            if (!$this->hasConnection()) {
556                throw $e;
557            }
558
559            return $this->request($path, $method, $data, $query);
560        }
561
562        $this->_logger->debug('Elastica Request', [
563            'request' => $request->toArray(),
564            'response' => $response->getData(),
565            'responseStatus' => $response->getStatus(),
566        ]);
567
568        return $response;
569    }
570
571    /**
572     * Makes calls to the elasticsearch server with usage official client Endpoint.
573     *
574     * @throws ClientException
575     * @throws ConnectionException
576     * @throws ResponseException
577     */
578    public function requestEndpoint(AbstractEndpoint $endpoint): Response
579    {
580        return $this->request(
581            \ltrim($endpoint->getURI(), '/'),
582            $endpoint->getMethod(),
583            $endpoint->getBody() ?? [],
584            $endpoint->getParams()
585        );
586    }
587
588    /**
589     * Force merges all search indices.
590     *
591     * @param array $args OPTIONAL Optional arguments
592     *
593     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
594     *
595     * @throws ClientException
596     * @throws ConnectionException
597     * @throws ResponseException
598     */
599    public function forcemergeAll($args = []): Response
600    {
601        $endpoint = new ForceMerge();
602        $endpoint->setParams($args);
603
604        return $this->requestEndpoint($endpoint);
605    }
606
607    /**
608     * Closes the given PointInTime.
609     *
610     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html#close-point-in-time-api
611     *
612     * @throws ClientException
613     * @throws ConnectionException
614     * @throws ResponseException
615     */
616    public function closePointInTime(string $pointInTimeId): Response
617    {
618        $endpoint = new ClosePointInTime();
619        $endpoint->setBody(['id' => $pointInTimeId]);
620
621        return $this->requestEndpoint($endpoint);
622    }
623
624    /**
625     * Refreshes all search indices.
626     *
627     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
628     *
629     * @throws ClientException
630     * @throws ConnectionException
631     * @throws ResponseException
632     */
633    public function refreshAll(): Response
634    {
635        return $this->requestEndpoint(new Refresh());
636    }
637
638    public function getLastRequest(): ?Request
639    {
640        return $this->_lastRequest;
641    }
642
643    public function getLastResponse(): ?Response
644    {
645        return $this->_lastResponse;
646    }
647
648    /**
649     * Replace the existing logger.
650     *
651     * @return $this
652     */
653    public function setLogger(LoggerInterface $logger)
654    {
655        $this->_logger = $logger;
656
657        return $this;
658    }
659
660    /**
661     * Inits the client connections.
662     */
663    protected function _initConnections(): void
664    {
665        $connections = [];
666
667        foreach ($this->getConfig('connections') as $connection) {
668            $connections[] = Connection::create($this->_prepareConnectionParams($connection));
669        }
670
671        if ($this->_config->has('servers')) {
672            $servers = $this->_config->get('servers');
673            foreach ($servers as $server) {
674                $connections[] = Connection::create($this->_prepareConnectionParams($server));
675            }
676        }
677
678        // If no connections set, create default connection
679        if (!$connections) {
680            $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
681        }
682
683        if (!$this->_config->has('connectionStrategy')) {
684            if (true === $this->getConfig('roundRobin')) {
685                $this->setConfigValue('connectionStrategy', 'RoundRobin');
686            } else {
687                $this->setConfigValue('connectionStrategy', 'Simple');
688            }
689        }
690
691        $strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));
692
693        $this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback);
694    }
695
696    /**
697     * Creates a Connection params array from a Client or server config array.
698     */
699    protected function _prepareConnectionParams(array $config): array
700    {
701        $params = [];
702        $params['config'] = [];
703        foreach ($config as $key => $value) {
704            if (\in_array($key, ['bigintConversion', 'curl', 'headers', 'url'])) {
705                $params['config'][$key] = $value;
706            } else {
707                $params[$key] = $value;
708            }
709        }
710
711        return $params;
712    }
713}
714