1<?php
2
3namespace Elastica;
4
5use Elastica\Bulk\Action;
6use Elastica\Exception\ConnectionException;
7use Elastica\Exception\InvalidException;
8use Elastica\Script\AbstractScript;
9use Elasticsearch\Endpoints\AbstractEndpoint;
10use Elasticsearch\Endpoints\Indices\ForceMerge;
11use Elasticsearch\Endpoints\Indices\Refresh;
12use Elasticsearch\Endpoints\Update;
13use Psr\Log\LoggerInterface;
14use Psr\Log\NullLogger;
15
16/**
17 * Client to connect the the elasticsearch server.
18 *
19 * @author Nicolas Ruflin <spam@ruflin.com>
20 */
21class Client
22{
23    /**
24     * Config with defaults.
25     *
26     * log: Set to true, to enable logging, set a string to log to a specific file
27     * retryOnConflict: Use in \Elastica\Client::updateDocument
28     * bigintConversion: Set to true to enable the JSON bigint to string conversion option (see issue #717)
29     *
30     * @var array
31     */
32    protected $_config = [
33        'host' => null,
34        'port' => null,
35        'path' => null,
36        'url' => null,
37        'proxy' => null,
38        'transport' => null,
39        'persistent' => true,
40        'timeout' => null,
41        'connections' => [], // host, port, path, timeout, transport, compression, persistent, timeout, username, password, config -> (curl, headers, url)
42        'roundRobin' => false,
43        'log' => false,
44        'retryOnConflict' => 0,
45        'bigintConversion' => false,
46        'username' => null,
47        'password' => null,
48    ];
49
50    /**
51     * @var callback
52     */
53    protected $_callback;
54
55    /**
56     * @var Connection\ConnectionPool
57     */
58    protected $_connectionPool;
59
60    /**
61     * @var \Elastica\Request|null
62     */
63    protected $_lastRequest;
64
65    /**
66     * @var \Elastica\Response|null
67     */
68    protected $_lastResponse;
69
70    /**
71     * @var LoggerInterface
72     */
73    protected $_logger;
74
75    /**
76     * @var string
77     */
78    protected $_version;
79
80    /**
81     * Creates a new Elastica client.
82     *
83     * @param array           $config   OPTIONAL Additional config options
84     * @param callback        $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down)
85     * @param LoggerInterface $logger
86     */
87    public function __construct(array $config = [], $callback = null, LoggerInterface $logger = null)
88    {
89        $this->_callback = $callback;
90
91        if (!$logger && isset($config['log']) && $config['log']) {
92            $logger = new Log($config['log']);
93        }
94        $this->_logger = $logger ?: new NullLogger();
95
96        $this->setConfig($config);
97        $this->_initConnections();
98    }
99
100    /**
101     * Get current version.
102     *
103     * @return string
104     */
105    public function getVersion()
106    {
107        if ($this->_version) {
108            return $this->_version;
109        }
110
111        $data = $this->request('/')->getData();
112
113        return $this->_version = $data['version']['number'];
114    }
115
116    /**
117     * Inits the client connections.
118     */
119    protected function _initConnections()
120    {
121        $connections = [];
122
123        foreach ($this->getConfig('connections') as $connection) {
124            $connections[] = Connection::create($this->_prepareConnectionParams($connection));
125        }
126
127        if (isset($this->_config['servers'])) {
128            foreach ($this->getConfig('servers') as $server) {
129                $connections[] = Connection::create($this->_prepareConnectionParams($server));
130            }
131        }
132
133        // If no connections set, create default connection
134        if (empty($connections)) {
135            $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
136        }
137
138        if (!isset($this->_config['connectionStrategy'])) {
139            if (true === $this->getConfig('roundRobin')) {
140                $this->setConfigValue('connectionStrategy', 'RoundRobin');
141            } else {
142                $this->setConfigValue('connectionStrategy', 'Simple');
143            }
144        }
145
146        $strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));
147
148        $this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback);
149    }
150
151    /**
152     * Creates a Connection params array from a Client or server config array.
153     *
154     * @param array $config
155     *
156     * @return array
157     */
158    protected function _prepareConnectionParams(array $config)
159    {
160        $params = [];
161        $params['config'] = [];
162        foreach ($config as $key => $value) {
163            if (\in_array($key, ['bigintConversion', 'curl', 'headers', 'url'])) {
164                $params['config'][$key] = $value;
165            } else {
166                $params[$key] = $value;
167            }
168        }
169
170        return $params;
171    }
172
173    /**
174     * Sets specific config values (updates and keeps default values).
175     *
176     * @param array $config Params
177     *
178     * @return $this
179     */
180    public function setConfig(array $config)
181    {
182        foreach ($config as $key => $value) {
183            $this->_config[$key] = $value;
184        }
185
186        return $this;
187    }
188
189    /**
190     * Returns a specific config key or the whole
191     * config array if not set.
192     *
193     * @param string $key Config key
194     *
195     * @throws \Elastica\Exception\InvalidException
196     *
197     * @return array|string Config value
198     */
199    public function getConfig($key = '')
200    {
201        if (empty($key)) {
202            return $this->_config;
203        }
204
205        if (!\array_key_exists($key, $this->_config)) {
206            throw new InvalidException('Config key is not set: '.$key);
207        }
208
209        return $this->_config[$key];
210    }
211
212    /**
213     * Sets / overwrites a specific config value.
214     *
215     * @param string $key   Key to set
216     * @param mixed  $value Value
217     *
218     * @return $this
219     */
220    public function setConfigValue($key, $value)
221    {
222        return $this->setConfig([$key => $value]);
223    }
224
225    /**
226     * @param array|string $keys    config key or path of config keys
227     * @param mixed        $default default value will be returned if key was not found
228     *
229     * @return mixed
230     */
231    public function getConfigValue($keys, $default = null)
232    {
233        $value = $this->_config;
234        foreach ((array) $keys as $key) {
235            if (isset($value[$key])) {
236                $value = $value[$key];
237            } else {
238                return $default;
239            }
240        }
241
242        return $value;
243    }
244
245    /**
246     * Returns the index for the given connection.
247     *
248     * @param string $name Index name to create connection to
249     *
250     * @return \Elastica\Index Index for the given name
251     */
252    public function getIndex($name)
253    {
254        return new Index($this, $name);
255    }
256
257    /**
258     * Adds a HTTP Header.
259     *
260     * @param string $header      The HTTP Header
261     * @param string $headerValue The HTTP Header Value
262     *
263     * @throws \Elastica\Exception\InvalidException If $header or $headerValue is not a string
264     *
265     * @return $this
266     */
267    public function addHeader($header, $headerValue)
268    {
269        if (\is_string($header) && \is_string($headerValue)) {
270            $this->_config['headers'][$header] = $headerValue;
271        } else {
272            throw new InvalidException('Header must be a string');
273        }
274
275        return $this;
276    }
277
278    /**
279     * Remove a HTTP Header.
280     *
281     * @param string $header The HTTP Header to remove
282     *
283     * @throws \Elastica\Exception\InvalidException If $header is not a string
284     *
285     * @return $this
286     */
287    public function removeHeader($header)
288    {
289        if (\is_string($header)) {
290            if (\array_key_exists($header, $this->_config['headers'])) {
291                unset($this->_config['headers'][$header]);
292            }
293        } else {
294            throw new InvalidException('Header must be a string');
295        }
296
297        return $this;
298    }
299
300    /**
301     * Uses _bulk to send documents to the server.
302     *
303     * Array of \Elastica\Document as input. Index and type has to be
304     * set inside the document, because for bulk settings documents,
305     * documents can belong to any type and index
306     *
307     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
308     *
309     * @param array|\Elastica\Document[] $docs          Array of Elastica\Document
310     * @param array                      $requestParams
311     *
312     * @throws \Elastica\Exception\InvalidException If docs is empty
313     *
314     * @return \Elastica\Bulk\ResponseSet Response object
315     */
316    public function updateDocuments(array $docs, array $requestParams = [])
317    {
318        if (empty($docs)) {
319            throw new InvalidException('Array has to consist of at least one element');
320        }
321
322        $bulk = new Bulk($this);
323
324        $bulk->addDocuments($docs, Action::OP_TYPE_UPDATE);
325        foreach ($requestParams as $key => $value) {
326            $bulk->setRequestParam($key, $value);
327        }
328
329        return $bulk->send();
330    }
331
332    /**
333     * Uses _bulk to send documents to the server.
334     *
335     * Array of \Elastica\Document as input. Index and type has to be
336     * set inside the document, because for bulk settings documents,
337     * documents can belong to any type and index
338     *
339     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
340     *
341     * @param array|\Elastica\Document[] $docs          Array of Elastica\Document
342     * @param array                      $requestParams
343     *
344     * @throws \Elastica\Exception\InvalidException If docs is empty
345     *
346     * @return \Elastica\Bulk\ResponseSet Response object
347     */
348    public function addDocuments(array $docs, array $requestParams = [])
349    {
350        if (empty($docs)) {
351            throw new InvalidException('Array has to consist of at least one element');
352        }
353
354        $bulk = new Bulk($this);
355
356        $bulk->addDocuments($docs);
357
358        foreach ($requestParams as $key => $value) {
359            $bulk->setRequestParam($key, $value);
360        }
361
362        return $bulk->send();
363    }
364
365    /**
366     * Update document, using update script. Requires elasticsearch >= 0.19.0.
367     *
368     * @param int|string                                               $id      document id
369     * @param array|\Elastica\Script\AbstractScript|\Elastica\Document $data    raw data for request body
370     * @param string                                                   $index   index to update
371     * @param string                                                   $type    type of index to update
372     * @param array                                                    $options array of query params to use for query. For possible options check es api
373     *
374     * @return \Elastica\Response
375     *
376     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
377     */
378    public function updateDocument($id, $data, $index, $type, array $options = [])
379    {
380        $endpoint = new Update();
381        $endpoint->setID($id);
382        $endpoint->setIndex($index);
383        $endpoint->setType($type);
384
385        if ($data instanceof AbstractScript) {
386            $requestData = $data->toArray();
387        } elseif ($data instanceof Document) {
388            $requestData = ['doc' => $data->getData()];
389
390            if ($data->getDocAsUpsert()) {
391                $requestData['doc_as_upsert'] = true;
392            }
393
394            $docOptions = $data->getOptions(
395                [
396                    'version',
397                    'version_type',
398                    'routing',
399                    'percolate',
400                    'parent',
401                    'retry_on_conflict',
402                    'consistency',
403                    'replication',
404                    'refresh',
405                    'timeout',
406                ]
407            );
408            $options += $docOptions;
409        } else {
410            $requestData = $data;
411        }
412
413        //If an upsert document exists
414        if ($data instanceof AbstractScript || $data instanceof Document) {
415            if ($data->hasUpsert()) {
416                $requestData['upsert'] = $data->getUpsert()->getData();
417            }
418        }
419
420        $endpoint->setBody($requestData);
421        $endpoint->setParams($options);
422
423        $response = $this->requestEndpoint($endpoint);
424
425        if ($response->isOk()
426            && $data instanceof Document
427            && ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false))
428        ) {
429            $responseData = $response->getData();
430            if (isset($responseData['_version'])) {
431                $data->setVersion($responseData['_version']);
432            }
433        }
434
435        return $response;
436    }
437
438    /**
439     * Bulk deletes documents.
440     *
441     * @param array|\Elastica\Document[] $docs
442     * @param array                      $requestParams
443     *
444     * @throws \Elastica\Exception\InvalidException
445     *
446     * @return \Elastica\Bulk\ResponseSet
447     */
448    public function deleteDocuments(array $docs, array $requestParams = [])
449    {
450        if (empty($docs)) {
451            throw new InvalidException('Array has to consist of at least one element');
452        }
453
454        $bulk = new Bulk($this);
455        $bulk->addDocuments($docs, Action::OP_TYPE_DELETE);
456
457        foreach ($requestParams as $key => $value) {
458            $bulk->setRequestParam($key, $value);
459        }
460
461        return $bulk->send();
462    }
463
464    /**
465     * Returns the status object for all indices.
466     *
467     * @return \Elastica\Status Status object
468     */
469    public function getStatus()
470    {
471        return new Status($this);
472    }
473
474    /**
475     * Returns the current cluster.
476     *
477     * @return \Elastica\Cluster Cluster object
478     */
479    public function getCluster()
480    {
481        return new Cluster($this);
482    }
483
484    /**
485     * Establishes the client connections.
486     */
487    public function connect()
488    {
489        return $this->_initConnections();
490    }
491
492    /**
493     * @param \Elastica\Connection $connection
494     *
495     * @return $this
496     */
497    public function addConnection(Connection $connection)
498    {
499        $this->_connectionPool->addConnection($connection);
500
501        return $this;
502    }
503
504    /**
505     * Determines whether a valid connection is available for use.
506     *
507     * @return bool
508     */
509    public function hasConnection()
510    {
511        return $this->_connectionPool->hasConnection();
512    }
513
514    /**
515     * @throws \Elastica\Exception\ClientException
516     *
517     * @return \Elastica\Connection
518     */
519    public function getConnection()
520    {
521        return $this->_connectionPool->getConnection();
522    }
523
524    /**
525     * @return \Elastica\Connection[]
526     */
527    public function getConnections()
528    {
529        return $this->_connectionPool->getConnections();
530    }
531
532    /**
533     * @return \Elastica\Connection\Strategy\StrategyInterface
534     */
535    public function getConnectionStrategy()
536    {
537        return $this->_connectionPool->getStrategy();
538    }
539
540    /**
541     * @param array|\Elastica\Connection[] $connections
542     *
543     * @return $this
544     */
545    public function setConnections(array $connections)
546    {
547        $this->_connectionPool->setConnections($connections);
548
549        return $this;
550    }
551
552    /**
553     * Deletes documents with the given ids, index, type from the index.
554     *
555     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
556     *
557     * @param array                  $ids     Document ids
558     * @param string|\Elastica\Index $index   Index name
559     * @param string|\Elastica\Type  $type    Type of documents
560     * @param string|bool            $routing Optional routing key for all ids
561     *
562     * @throws \Elastica\Exception\InvalidException
563     *
564     * @return \Elastica\Bulk\ResponseSet Response  object
565     */
566    public function deleteIds(array $ids, $index, $type, $routing = false)
567    {
568        if (empty($ids)) {
569            throw new InvalidException('Array has to consist of at least one id');
570        }
571
572        $bulk = new Bulk($this);
573        $bulk->setIndex($index);
574        $bulk->setType($type);
575
576        foreach ($ids as $id) {
577            $action = new Action(Action::OP_TYPE_DELETE);
578            $action->setId($id);
579
580            if (!empty($routing)) {
581                $action->setRouting($routing);
582            }
583
584            $bulk->addAction($action);
585        }
586
587        return $bulk->send();
588    }
589
590    /**
591     * Bulk operation.
592     *
593     * Every entry in the params array has to exactly on array
594     * of the bulk operation. An example param array would be:
595     *
596     * array(
597     *         array('index' => array('_index' => 'test', '_type' => 'user', '_id' => '1')),
598     *         array('user' => array('name' => 'hans')),
599     *         array('delete' => array('_index' => 'test', '_type' => 'user', '_id' => '2'))
600     * );
601     *
602     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
603     *
604     * @param array $params Parameter array
605     *
606     * @throws \Elastica\Exception\ResponseException
607     * @throws \Elastica\Exception\InvalidException
608     *
609     * @return \Elastica\Bulk\ResponseSet Response object
610     */
611    public function bulk(array $params)
612    {
613        if (empty($params)) {
614            throw new InvalidException('Array has to consist of at least one param');
615        }
616
617        $bulk = new Bulk($this);
618
619        $bulk->addRawData($params);
620
621        return $bulk->send();
622    }
623
624    /**
625     * Makes calls to the elasticsearch server based on this index.
626     *
627     * It's possible to make any REST query directly over this method
628     *
629     * @param string       $path        Path to call
630     * @param string       $method      Rest method to use (GET, POST, DELETE, PUT)
631     * @param array|string $data        OPTIONAL Arguments as array or pre-encoded string
632     * @param array        $query       OPTIONAL Query params
633     * @param string       $contentType Content-Type sent with this request
634     *
635     * @throws Exception\ConnectionException|Exception\ClientException
636     *
637     * @return Response Response object
638     */
639    public function request($path, $method = Request::GET, $data = [], array $query = [], $contentType = Request::DEFAULT_CONTENT_TYPE)
640    {
641        $connection = $this->getConnection();
642        $request = $this->_lastRequest = new Request($path, $method, $data, $query, $connection, $contentType);
643        $this->_lastResponse = null;
644
645        try {
646            $response = $this->_lastResponse = $request->send();
647        } catch (ConnectionException $e) {
648            $this->_connectionPool->onFail($connection, $e, $this);
649            $this->_log($e);
650
651            // In case there is no valid connection left, throw exception which caused the disabling of the connection.
652            if (!$this->hasConnection()) {
653                throw $e;
654            }
655
656            return $this->request($path, $method, $data, $query);
657        }
658
659        $this->_log($request);
660
661        return $response;
662    }
663
664    /**
665     * Makes calls to the elasticsearch server with usage official client Endpoint.
666     *
667     * @param AbstractEndpoint $endpoint
668     *
669     * @return Response
670     */
671    public function requestEndpoint(AbstractEndpoint $endpoint)
672    {
673        return $this->request(
674            \ltrim($endpoint->getURI(), '/'),
675            $endpoint->getMethod(),
676            null === $endpoint->getBody() ? [] : $endpoint->getBody(),
677            $endpoint->getParams()
678        );
679    }
680
681    /**
682     * logging.
683     *
684     * @deprecated Overwriting Client->_log is deprecated. Handle logging functionality by using a custom LoggerInterface.
685     *
686     * @param mixed $context
687     */
688    protected function _log($context)
689    {
690        if ($context instanceof ConnectionException) {
691            $this->_logger->error('Elastica Request Failure', [
692                'exception' => $context,
693                'request' => $context->getRequest()->toArray(),
694                'retry' => $this->hasConnection(),
695            ]);
696
697            return;
698        }
699
700        if ($context instanceof Request) {
701            $this->_logger->debug('Elastica Request', [
702                'request' => $context->toArray(),
703                'response' => $this->_lastResponse ? $this->_lastResponse->getData() : null,
704                'responseStatus' => $this->_lastResponse ? $this->_lastResponse->getStatus() : null,
705            ]);
706
707            return;
708        }
709
710        $this->_logger->debug('Elastica Request', [
711            'message' => $context,
712        ]);
713    }
714
715    /**
716     * Force merges all search indices.
717     *
718     * @param array $args OPTIONAL Optional arguments
719     *
720     * @return \Elastica\Response Response object
721     *
722     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
723     */
724    public function forcemergeAll($args = [])
725    {
726        $endpoint = new ForceMerge();
727        $endpoint->setParams($args);
728
729        return $this->requestEndpoint($endpoint);
730    }
731
732    /**
733     * Refreshes all search indices.
734     *
735     * @return \Elastica\Response Response object
736     *
737     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
738     */
739    public function refreshAll()
740    {
741        return $this->requestEndpoint(new Refresh());
742    }
743
744    /**
745     * @return Request|null
746     */
747    public function getLastRequest()
748    {
749        return $this->_lastRequest;
750    }
751
752    /**
753     * @return Response|null
754     */
755    public function getLastResponse()
756    {
757        return $this->_lastResponse;
758    }
759
760    /**
761     * Replace the existing logger.
762     *
763     * @param LoggerInterface $logger
764     *
765     * @return $this
766     */
767    public function setLogger(LoggerInterface $logger)
768    {
769        $this->_logger = $logger;
770
771        return $this;
772    }
773}
774