1<?php 2/** 3 * Elasticsearch PHP client 4 * 5 * @link https://github.com/elastic/elasticsearch-php/ 6 * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co) 7 * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 8 * @license https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1 9 * 10 * Licensed to Elasticsearch B.V under one or more agreements. 11 * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or 12 * the GNU Lesser General Public License, Version 2.1, at your option. 13 * See the LICENSE file in the project root for more information. 14 */ 15 16 17declare(strict_types = 1); 18 19namespace Elasticsearch; 20 21use Elasticsearch\Common\Exceptions; 22use Elasticsearch\ConnectionPool\AbstractConnectionPool; 23use Elasticsearch\Connections\Connection; 24use Elasticsearch\Connections\ConnectionInterface; 25use GuzzleHttp\Ring\Future\FutureArrayInterface; 26use Psr\Log\LoggerInterface; 27 28class Transport 29{ 30 /** 31 * @var AbstractConnectionPool 32 */ 33 public $connectionPool; 34 35 /** 36 * @var LoggerInterface 37 */ 38 private $log; 39 40 /** 41 * @var int 42 */ 43 public $retryAttempts = 0; 44 45 /** 46 * @var Connection 47 */ 48 public $lastConnection; 49 50 /** 51 * @var int 52 */ 53 public $retries; 54 55 /** 56 * Transport class is responsible for dispatching requests to the 57 * underlying cluster connections 58 * 59 * @param int $retries 60 * @param bool $sniffOnStart 61 * @param ConnectionPool\AbstractConnectionPool $connectionPool 62 * @param \Psr\Log\LoggerInterface $log Monolog logger object 63 */ 64 public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false) 65 { 66 $this->log = $log; 67 $this->connectionPool = $connectionPool; 68 $this->retries = $retries; 69 70 if ($sniffOnStart === true) { 71 $this->log->notice('Sniff on Start.'); 72 $this->connectionPool->scheduleCheck(); 73 } 74 } 75 76 /** 77 * Returns a single connection from the connection pool 78 * Potentially performs a sniffing step before returning 79 */ 80 public function getConnection(): ConnectionInterface 81 { 82 return $this->connectionPool->nextConnection(); 83 } 84 85 /** 86 * Perform a request to the Cluster 87 * 88 * @param string $method HTTP method to use 89 * @param string $uri HTTP URI to send request to 90 * @param array $params Optional query parameters 91 * @param null $body Optional query body 92 * @param array $options 93 * 94 * @throws Common\Exceptions\NoNodesAvailableException|\Exception 95 */ 96 public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface 97 { 98 try { 99 $connection = $this->getConnection(); 100 } catch (Exceptions\NoNodesAvailableException $exception) { 101 $this->log->critical('No alive nodes found in cluster'); 102 throw $exception; 103 } 104 105 $response = []; 106 $caughtException = null; 107 $this->lastConnection = $connection; 108 109 $future = $connection->performRequest( 110 $method, 111 $uri, 112 $params, 113 $body, 114 $options, 115 $this 116 ); 117 118 $future->promise()->then( 119 //onSuccess 120 function ($response) { 121 $this->retryAttempts = 0; 122 // Note, this could be a 4xx or 5xx error 123 }, 124 //onFailure 125 function ($response) { 126 $code = $response->getCode(); 127 // Ignore 400 level errors, as that means the server responded just fine 128 if ($code < 400 || $code >= 500) { 129 // Otherwise schedule a check 130 $this->connectionPool->scheduleCheck(); 131 } 132 } 133 ); 134 135 return $future; 136 } 137 138 /** 139 * @param FutureArrayInterface $result Response of a request (promise) 140 * @param array $options Options for transport 141 * 142 * @return callable|array 143 */ 144 public function resultOrFuture(FutureArrayInterface $result, array $options = []) 145 { 146 $response = null; 147 $async = isset($options['client']['future']) ? $options['client']['future'] : null; 148 if (is_null($async) || $async === false) { 149 do { 150 $result = $result->wait(); 151 } while ($result instanceof FutureArrayInterface); 152 } 153 return $result; 154 } 155 156 public function shouldRetry(array $request): bool 157 { 158 if ($this->retryAttempts < $this->retries) { 159 $this->retryAttempts += 1; 160 161 return true; 162 } 163 164 return false; 165 } 166 167 /** 168 * Returns the last used connection so that it may be inspected. Mainly 169 * for debugging/testing purposes. 170 */ 171 public function getLastConnection(): ConnectionInterface 172 { 173 return $this->lastConnection; 174 } 175} 176