log = $log; $this->connectionPool = $connectionPool; $this->retries = $retries; if ($sniffOnStart === true) { $this->log->notice('Sniff on Start.'); $this->connectionPool->scheduleCheck(); } } /** * Returns a single connection from the connection pool * Potentially performs a sniffing step before returning */ public function getConnection(): ConnectionInterface { return $this->connectionPool->nextConnection(); } /** * Perform a request to the Cluster * * @param string $method HTTP method to use * @param string $uri HTTP URI to send request to * @param array $params Optional query parameters * @param null $body Optional query body * @param array $options * * @throws Common\Exceptions\NoNodesAvailableException|\Exception */ public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface { try { $connection = $this->getConnection(); } catch (Exceptions\NoNodesAvailableException $exception) { $this->log->critical('No alive nodes found in cluster'); throw $exception; } $response = []; $caughtException = null; $this->lastConnection = $connection; $future = $connection->performRequest( $method, $uri, $params, $body, $options, $this ); $future->promise()->then( //onSuccess function ($response) { $this->retryAttempts = 0; // Note, this could be a 4xx or 5xx error }, //onFailure function ($response) { $code = $response->getCode(); // Ignore 400 level errors, as that means the server responded just fine if ($code < 400 || $code >= 500) { // Otherwise schedule a check $this->connectionPool->scheduleCheck(); } } ); return $future; } /** * @param FutureArrayInterface $result Response of a request (promise) * @param array $options Options for transport * * @return callable|array */ public function resultOrFuture(FutureArrayInterface $result, array $options = []) { $response = null; $async = isset($options['client']['future']) ? $options['client']['future'] : null; if (is_null($async) || $async === false) { do { $result = $result->wait(); } while ($result instanceof FutureArrayInterface); } return $result; } public function shouldRetry(array $request): bool { if ($this->retryAttempts < $this->retries) { $this->retryAttempts += 1; return true; } return false; } /** * Returns the last used connection so that it may be inspected. Mainly * for debugging/testing purposes. */ public function getLastConnection(): ConnectionInterface { return $this->lastConnection; } }