1<?php 2 3namespace Elastica; 4 5use Elastica\Bulk\Action; 6use Elastica\Bulk\ResponseSet; 7use Elastica\Exception\Bulk\ResponseException as BulkResponseException; 8use Elastica\Exception\ClientException; 9use Elastica\Exception\ConnectionException; 10use Elastica\Exception\InvalidException; 11use Elastica\Exception\ResponseException; 12use Elastica\Script\AbstractScript; 13use Elasticsearch\Endpoints\AbstractEndpoint; 14use Elasticsearch\Endpoints\ClosePointInTime; 15use Elasticsearch\Endpoints\Indices\ForceMerge; 16use Elasticsearch\Endpoints\Indices\Refresh; 17use Elasticsearch\Endpoints\Update; 18use Psr\Log\LoggerInterface; 19use Psr\Log\NullLogger; 20 21/** 22 * Client to connect the the elasticsearch server. 23 * 24 * @author Nicolas Ruflin <spam@ruflin.com> 25 */ 26class Client 27{ 28 /** 29 * @var ClientConfiguration 30 */ 31 protected $_config; 32 33 /** 34 * @var callable 35 */ 36 protected $_callback; 37 38 /** 39 * @var Connection\ConnectionPool 40 */ 41 protected $_connectionPool; 42 43 /** 44 * @var Request|null 45 */ 46 protected $_lastRequest; 47 48 /** 49 * @var Response|null 50 */ 51 protected $_lastResponse; 52 53 /** 54 * @var LoggerInterface 55 */ 56 protected $_logger; 57 58 /** 59 * @var string 60 */ 61 protected $_version; 62 63 /** 64 * Creates a new Elastica client. 65 * 66 * @param array|string $config OPTIONAL Additional config or DSN of options 67 * @param callable|null $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down) 68 * 69 * @throws InvalidException 70 */ 71 public function __construct($config = [], ?callable $callback = null, ?LoggerInterface $logger = null) 72 { 73 if (\is_string($config)) { 74 $configuration = ClientConfiguration::fromDsn($config); 75 } elseif (\is_array($config)) { 76 $configuration = ClientConfiguration::fromArray($config); 77 } else { 78 throw new InvalidException('Config parameter must be an array or a string.'); 79 } 80 81 $this->_config = $configuration; 82 $this->_callback = $callback; 83 $this->_logger = $logger ?? new NullLogger(); 84 85 $this->_initConnections(); 86 } 87 88 /** 89 * Get current version. 90 * 91 * @throws ClientException 92 * @throws ConnectionException 93 * @throws ResponseException 94 */ 95 public function getVersion(): string 96 { 97 if ($this->_version) { 98 return $this->_version; 99 } 100 101 $data = $this->request('/')->getData(); 102 103 return $this->_version = $data['version']['number']; 104 } 105 106 /** 107 * Sets specific config values (updates and keeps default values). 108 * 109 * @param array $config Params 110 */ 111 public function setConfig(array $config): self 112 { 113 foreach ($config as $key => $value) { 114 $this->_config->set($key, $value); 115 } 116 117 return $this; 118 } 119 120 /** 121 * Returns a specific config key or the whole config array if not set. 122 * 123 * @throws InvalidException if the given key is not found in the configuration 124 * 125 * @return array|bool|string 126 */ 127 public function getConfig(string $key = '') 128 { 129 return $this->_config->get($key); 130 } 131 132 /** 133 * Sets / overwrites a specific config value. 134 * 135 * @param mixed $value Value 136 */ 137 public function setConfigValue(string $key, $value): self 138 { 139 return $this->setConfig([$key => $value]); 140 } 141 142 /** 143 * @param array|string $keys config key or path of config keys 144 * @param mixed $default default value will be returned if key was not found 145 * 146 * @return mixed 147 */ 148 public function getConfigValue($keys, $default = null) 149 { 150 $value = $this->_config->getAll(); 151 foreach ((array) $keys as $key) { 152 if (isset($value[$key])) { 153 $value = $value[$key]; 154 } else { 155 return $default; 156 } 157 } 158 159 return $value; 160 } 161 162 /** 163 * Returns the index for the given connection. 164 */ 165 public function getIndex(string $name): Index 166 { 167 return new Index($this, $name); 168 } 169 170 /** 171 * Adds a HTTP Header. 172 */ 173 public function addHeader(string $header, string $value): self 174 { 175 if ($this->_config->has('headers')) { 176 $headers = $this->_config->get('headers'); 177 } else { 178 $headers = []; 179 } 180 $headers[$header] = $value; 181 $this->_config->set('headers', $headers); 182 183 return $this; 184 } 185 186 /** 187 * Remove a HTTP Header. 188 */ 189 public function removeHeader(string $header): self 190 { 191 if ($this->_config->has('headers')) { 192 $headers = $this->_config->get('headers'); 193 unset($headers[$header]); 194 $this->_config->set('headers', $headers); 195 } 196 197 return $this; 198 } 199 200 /** 201 * Uses _bulk to send documents to the server. 202 * 203 * Array of \Elastica\Document as input. Index has to be set inside the 204 * document, because for bulk settings documents, documents can belong to 205 * any index 206 * 207 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html 208 * 209 * @param array|Document[] $docs Array of Elastica\Document 210 * 211 * @throws InvalidException If docs is empty 212 * @throws ClientException 213 * @throws ConnectionException 214 * @throws ResponseException 215 * @throws BulkResponseException 216 */ 217 public function updateDocuments(array $docs, array $requestParams = []): ResponseSet 218 { 219 if (!$docs) { 220 throw new InvalidException('Array has to consist of at least one element'); 221 } 222 223 $bulk = new Bulk($this); 224 225 $bulk->addDocuments($docs, Action::OP_TYPE_UPDATE); 226 foreach ($requestParams as $key => $value) { 227 $bulk->setRequestParam($key, $value); 228 } 229 230 return $bulk->send(); 231 } 232 233 /** 234 * Uses _bulk to send documents to the server. 235 * 236 * Array of \Elastica\Document as input. Index has to be set inside the 237 * document, because for bulk settings documents, documents can belong to 238 * any index 239 * 240 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html 241 * 242 * @param array|Document[] $docs Array of Elastica\Document 243 * 244 * @throws InvalidException If docs is empty 245 * @throws ClientException 246 * @throws ConnectionException 247 * @throws ResponseException 248 * @throws BulkResponseException 249 */ 250 public function addDocuments(array $docs, array $requestParams = []): ResponseSet 251 { 252 if (!$docs) { 253 throw new InvalidException('Array has to consist of at least one element'); 254 } 255 256 $bulk = new Bulk($this); 257 258 $bulk->addDocuments($docs); 259 260 foreach ($requestParams as $key => $value) { 261 $bulk->setRequestParam($key, $value); 262 } 263 264 return $bulk->send(); 265 } 266 267 /** 268 * Update document, using update script. Requires elasticsearch >= 0.19.0. 269 * 270 * @param int|string $id document id 271 * @param AbstractScript|array|Document $data raw data for request body 272 * @param string $index index to update 273 * @param array $options array of query params to use for query. For possible options check es api 274 * 275 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html 276 * 277 * @throws ClientException 278 * @throws ConnectionException 279 * @throws ResponseException 280 */ 281 public function updateDocument($id, $data, $index, array $options = []): Response 282 { 283 $endpoint = new Update(); 284 $endpoint->setId($id); 285 $endpoint->setIndex($index); 286 287 if ($data instanceof AbstractScript) { 288 $requestData = $data->toArray(); 289 } elseif ($data instanceof Document) { 290 $requestData = ['doc' => $data->getData()]; 291 292 if ($data->getDocAsUpsert()) { 293 $requestData['doc_as_upsert'] = true; 294 } 295 296 $docOptions = $data->getOptions( 297 [ 298 'consistency', 299 'parent', 300 'percolate', 301 'refresh', 302 'replication', 303 'retry_on_conflict', 304 'routing', 305 'timeout', 306 ] 307 ); 308 $options += $docOptions; 309 } else { 310 $requestData = $data; 311 } 312 313 // If an upsert document exists 314 if ($data instanceof AbstractScript || $data instanceof Document) { 315 if ($data->hasUpsert()) { 316 $requestData['upsert'] = $data->getUpsert()->getData(); 317 } 318 } 319 320 $endpoint->setBody($requestData); 321 $endpoint->setParams($options); 322 323 $response = $this->requestEndpoint($endpoint); 324 325 if ($response->isOk() 326 && $data instanceof Document 327 && ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false)) 328 ) { 329 $data->setVersionParams($response->getData()); 330 } 331 332 return $response; 333 } 334 335 /** 336 * Bulk deletes documents. 337 * 338 * @param array|Document[] $docs 339 * 340 * @throws InvalidException 341 * @throws ClientException 342 * @throws ConnectionException 343 * @throws ResponseException 344 * @throws BulkResponseException 345 */ 346 public function deleteDocuments(array $docs, array $requestParams = []): ResponseSet 347 { 348 if (!$docs) { 349 throw new InvalidException('Array has to consist of at least one element'); 350 } 351 352 $bulk = new Bulk($this); 353 $bulk->addDocuments($docs, Action::OP_TYPE_DELETE); 354 355 foreach ($requestParams as $key => $value) { 356 $bulk->setRequestParam($key, $value); 357 } 358 359 return $bulk->send(); 360 } 361 362 /** 363 * Returns the status object for all indices. 364 * 365 * @return Status 366 */ 367 public function getStatus() 368 { 369 return new Status($this); 370 } 371 372 /** 373 * Returns the current cluster. 374 * 375 * @return Cluster 376 */ 377 public function getCluster() 378 { 379 return new Cluster($this); 380 } 381 382 /** 383 * Establishes the client connections. 384 */ 385 public function connect() 386 { 387 $this->_initConnections(); 388 } 389 390 /** 391 * @return $this 392 */ 393 public function addConnection(Connection $connection) 394 { 395 $this->_connectionPool->addConnection($connection); 396 397 return $this; 398 } 399 400 /** 401 * Determines whether a valid connection is available for use. 402 * 403 * @return bool 404 */ 405 public function hasConnection() 406 { 407 return $this->_connectionPool->hasConnection(); 408 } 409 410 /** 411 * @throws ClientException 412 * 413 * @return Connection 414 */ 415 public function getConnection() 416 { 417 return $this->_connectionPool->getConnection(); 418 } 419 420 /** 421 * @return Connection[] 422 */ 423 public function getConnections() 424 { 425 return $this->_connectionPool->getConnections(); 426 } 427 428 /** 429 * @return \Elastica\Connection\Strategy\StrategyInterface 430 */ 431 public function getConnectionStrategy() 432 { 433 return $this->_connectionPool->getStrategy(); 434 } 435 436 /** 437 * @param array|Connection[] $connections 438 * 439 * @return $this 440 */ 441 public function setConnections(array $connections) 442 { 443 $this->_connectionPool->setConnections($connections); 444 445 return $this; 446 } 447 448 /** 449 * Deletes documents with the given ids, index, type from the index. 450 * 451 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html 452 * 453 * @param array $ids Document ids 454 * @param Index|string $index Index name 455 * @param bool|string $routing Optional routing key for all ids 456 * 457 * @throws InvalidException 458 * @throws ClientException 459 * @throws ConnectionException 460 * @throws ResponseException 461 * @throws BulkResponseException 462 */ 463 public function deleteIds(array $ids, $index, $routing = false): ResponseSet 464 { 465 if (!$ids) { 466 throw new InvalidException('Array has to consist of at least one id'); 467 } 468 469 $bulk = new Bulk($this); 470 $bulk->setIndex($index); 471 472 foreach ($ids as $id) { 473 $action = new Action(Action::OP_TYPE_DELETE); 474 $action->setId($id); 475 476 if (!empty($routing)) { 477 $action->setRouting($routing); 478 } 479 480 $bulk->addAction($action); 481 } 482 483 return $bulk->send(); 484 } 485 486 /** 487 * Bulk operation. 488 * 489 * Every entry in the params array has to exactly on array 490 * of the bulk operation. An example param array would be: 491 * 492 * array( 493 * array('index' => array('_index' => 'test', '_id' => '1')), 494 * array('field1' => 'value1'), 495 * array('delete' => array('_index' => 'test', '_id' => '2')), 496 * array('create' => array('_index' => 'test', '_id' => '3')), 497 * array('field1' => 'value3'), 498 * array('update' => array('_id' => '1', '_index' => 'test')), 499 * array('doc' => array('field2' => 'value2')), 500 * ); 501 * 502 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html 503 * 504 * @throws ResponseException 505 * @throws InvalidException 506 * @throws ClientException 507 * @throws ConnectionException 508 * @throws BulkResponseException 509 */ 510 public function bulk(array $params): ResponseSet 511 { 512 if (!$params) { 513 throw new InvalidException('Array has to consist of at least one param'); 514 } 515 516 $bulk = new Bulk($this); 517 518 $bulk->addRawData($params); 519 520 return $bulk->send(); 521 } 522 523 /** 524 * Makes calls to the elasticsearch server based on this index. 525 * 526 * It's possible to make any REST query directly over this method 527 * 528 * @param string $path Path to call 529 * @param string $method Rest method to use (GET, POST, DELETE, PUT) 530 * @param array|string $data OPTIONAL Arguments as array or pre-encoded string 531 * @param array $query OPTIONAL Query params 532 * @param string $contentType Content-Type sent with this request 533 * 534 * @throws ClientException 535 * @throws ConnectionException 536 * @throws ResponseException 537 */ 538 public function request(string $path, string $method = Request::GET, $data = [], array $query = [], string $contentType = Request::DEFAULT_CONTENT_TYPE): Response 539 { 540 $connection = $this->getConnection(); 541 $request = $this->_lastRequest = new Request($path, $method, $data, $query, $connection, $contentType); 542 $this->_lastResponse = null; 543 544 try { 545 $response = $this->_lastResponse = $request->send(); 546 } catch (ConnectionException $e) { 547 $this->_connectionPool->onFail($connection, $e, $this); 548 $this->_logger->error('Elastica Request Failure', [ 549 'exception' => $e, 550 'request' => $e->getRequest()->toArray(), 551 'retry' => $this->hasConnection(), 552 ]); 553 554 // In case there is no valid connection left, throw exception which caused the disabling of the connection. 555 if (!$this->hasConnection()) { 556 throw $e; 557 } 558 559 return $this->request($path, $method, $data, $query); 560 } 561 562 $this->_logger->debug('Elastica Request', [ 563 'request' => $request->toArray(), 564 'response' => $response->getData(), 565 'responseStatus' => $response->getStatus(), 566 ]); 567 568 return $response; 569 } 570 571 /** 572 * Makes calls to the elasticsearch server with usage official client Endpoint. 573 * 574 * @throws ClientException 575 * @throws ConnectionException 576 * @throws ResponseException 577 */ 578 public function requestEndpoint(AbstractEndpoint $endpoint): Response 579 { 580 return $this->request( 581 \ltrim($endpoint->getURI(), '/'), 582 $endpoint->getMethod(), 583 $endpoint->getBody() ?? [], 584 $endpoint->getParams() 585 ); 586 } 587 588 /** 589 * Force merges all search indices. 590 * 591 * @param array $args OPTIONAL Optional arguments 592 * 593 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html 594 * 595 * @throws ClientException 596 * @throws ConnectionException 597 * @throws ResponseException 598 */ 599 public function forcemergeAll($args = []): Response 600 { 601 $endpoint = new ForceMerge(); 602 $endpoint->setParams($args); 603 604 return $this->requestEndpoint($endpoint); 605 } 606 607 /** 608 * Closes the given PointInTime. 609 * 610 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html#close-point-in-time-api 611 * 612 * @throws ClientException 613 * @throws ConnectionException 614 * @throws ResponseException 615 */ 616 public function closePointInTime(string $pointInTimeId): Response 617 { 618 $endpoint = new ClosePointInTime(); 619 $endpoint->setBody(['id' => $pointInTimeId]); 620 621 return $this->requestEndpoint($endpoint); 622 } 623 624 /** 625 * Refreshes all search indices. 626 * 627 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html 628 * 629 * @throws ClientException 630 * @throws ConnectionException 631 * @throws ResponseException 632 */ 633 public function refreshAll(): Response 634 { 635 return $this->requestEndpoint(new Refresh()); 636 } 637 638 public function getLastRequest(): ?Request 639 { 640 return $this->_lastRequest; 641 } 642 643 public function getLastResponse(): ?Response 644 { 645 return $this->_lastResponse; 646 } 647 648 /** 649 * Replace the existing logger. 650 * 651 * @return $this 652 */ 653 public function setLogger(LoggerInterface $logger) 654 { 655 $this->_logger = $logger; 656 657 return $this; 658 } 659 660 /** 661 * Inits the client connections. 662 */ 663 protected function _initConnections(): void 664 { 665 $connections = []; 666 667 foreach ($this->getConfig('connections') as $connection) { 668 $connections[] = Connection::create($this->_prepareConnectionParams($connection)); 669 } 670 671 if ($this->_config->has('servers')) { 672 $servers = $this->_config->get('servers'); 673 foreach ($servers as $server) { 674 $connections[] = Connection::create($this->_prepareConnectionParams($server)); 675 } 676 } 677 678 // If no connections set, create default connection 679 if (!$connections) { 680 $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig())); 681 } 682 683 if (!$this->_config->has('connectionStrategy')) { 684 if (true === $this->getConfig('roundRobin')) { 685 $this->setConfigValue('connectionStrategy', 'RoundRobin'); 686 } else { 687 $this->setConfigValue('connectionStrategy', 'Simple'); 688 } 689 } 690 691 $strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy')); 692 693 $this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback); 694 } 695 696 /** 697 * Creates a Connection params array from a Client or server config array. 698 */ 699 protected function _prepareConnectionParams(array $config): array 700 { 701 $params = []; 702 $params['config'] = []; 703 foreach ($config as $key => $value) { 704 if (\in_array($key, ['bigintConversion', 'curl', 'headers', 'url'])) { 705 $params['config'][$key] = $value; 706 } else { 707 $params[$key] = $value; 708 } 709 } 710 711 return $params; 712 } 713} 714