transportSchema = $hostDetails['scheme']; } // Only Set the Basic if API Key is not set and setBasicAuthentication was not called prior if (isset($connectionParams['client']['headers']['Authorization']) === false && isset($connectionParams['client']['curl'][CURLOPT_HTTPAUTH]) === false && isset($hostDetails['user']) && isset($hostDetails['pass']) ) { $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC; $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass']; } $connectionParams['client']['curl'][CURLOPT_PORT] = $hostDetails['port']; if (isset($connectionParams['client']['headers'])) { $this->headers = $connectionParams['client']['headers']; unset($connectionParams['client']['headers']); } // Add the User-Agent using the format: / (metadata-values) $this->headers['User-Agent'] = [sprintf( "elasticsearch-php/%s (%s %s; PHP %s)", Client::VERSION, PHP_OS, $this->getOSVersion(), phpversion() )]; // Add x-elastic-client-meta header, if enabled if (isset($connectionParams['client']['x-elastic-client-meta']) && $connectionParams['client']['x-elastic-client-meta']) { $this->headers['x-elastic-client-meta'] = [$this->getElasticMetaHeader($connectionParams)]; } $host = $hostDetails['host']; $path = null; if (isset($hostDetails['path']) === true) { $path = $hostDetails['path']; } $port = $hostDetails['port']; $this->host = $host; $this->path = $path; $this->port = $port; $this->log = $log; $this->trace = $trace; $this->connectionParams = $connectionParams; $this->serializer = $serializer; $this->handler = $this->wrapHandler($handler); } /** * @param string $method * @param string $uri * @param null|array $params * @param null $body * @param array $options * @param Transport $transport * @return mixed */ public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], Transport $transport = null) { if ($body !== null) { $body = $this->serializer->serialize($body); } $headers = $this->headers; if (isset($options['client']['headers']) && is_array($options['client']['headers'])) { $headers = array_merge($this->headers, $options['client']['headers']); } $host = $this->host; if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) { $host .= ':' . $this->port; } $request = [ 'http_method' => $method, 'scheme' => $this->transportSchema, 'uri' => $this->getURI($uri, $params), 'body' => $body, 'headers' => array_merge( [ 'Host' => [$host] ], $headers ) ]; $request = array_replace_recursive($request, $this->connectionParams, $options); // RingPHP does not like if client is empty if (empty($request['client'])) { unset($request['client']); } $handler = $this->handler; $future = $handler($request, $this, $transport, $options); return $future; } public function getTransportSchema(): string { return $this->transportSchema; } public function getLastRequestInfo(): array { return $this->lastRequest; } private function wrapHandler(callable $handler): callable { return function (array $request, Connection $connection, Transport $transport = null, $options) use ($handler) { $this->lastRequest = []; $this->lastRequest['request'] = $request; // Send the request using the wrapped handler. $response = Core::proxy( $handler($request), function ($response) use ($connection, $transport, $request, $options) { $this->lastRequest['response'] = $response; if (isset($response['error']) === true) { if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) { $this->log->warning("Curl exception encountered."); $exception = $this->getCurlRetryException($request, $response); $this->logRequestFail($request, $response, $exception); $node = $connection->getHost(); $this->log->warning("Marking node $node dead."); $connection->markDead(); // If the transport has not been set, we are inside a Ping or Sniff, // so we don't want to retrigger retries anyway. // // TODO this could be handled better, but we are limited because connectionpools do not // have access to Transport. Architecturally, all of this needs to be refactored if (isset($transport) === true) { $transport->connectionPool->scheduleCheck(); $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; $shouldRetry = $transport->shouldRetry($request); $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; $this->log->warning("Retries left? $shouldRetryText"); if ($shouldRetry && !$neverRetry) { return $transport->performRequest( $request['http_method'], $request['uri'], [], $request['body'], $options ); } } $this->log->warning("Out of retries, throwing exception from $node"); // Only throw if we run out of retries throw $exception; } else { // Something went seriously wrong, bail $exception = new TransportException($response['error']->getMessage()); $this->logRequestFail($request, $response, $exception); throw $exception; } } else { $connection->markAlive(); if (isset($response['headers']['Warning'])) { $this->logWarning($request, $response); } if (isset($response['body']) === true) { $response['body'] = stream_get_contents($response['body']); $this->lastRequest['response']['body'] = $response['body']; } if ($response['status'] >= 400 && $response['status'] < 500) { $ignore = $request['client']['ignore'] ?? []; // Skip 404 if succeeded true in the body (e.g. clear_scroll) $body = $response['body'] ?? ''; if (strpos($body, '"succeeded":true') !== false) { $ignore[] = 404; } $this->process4xxError($request, $response, $ignore); } elseif ($response['status'] >= 500) { $ignore = $request['client']['ignore'] ?? []; $this->process5xxError($request, $response, $ignore); } // No error, deserialize $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']); } $this->logRequestSuccess($request, $response); return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; } ); return $response; }; } private function getURI(string $uri, ?array $params): string { if (isset($params) === true && !empty($params)) { $params = array_map( function ($value) { if ($value === true) { return 'true'; } elseif ($value === false) { return 'false'; } return $value; }, $params ); $uri .= '?' . http_build_query($params); } if ($this->path !== null) { $uri = $this->path . $uri; } return $uri ?? ''; } public function getHeaders(): array { return $this->headers; } public function logWarning(array $request, array $response): void { $this->log->warning('Deprecation', $response['headers']['Warning']); } /** * Log a successful request * * @param array $request * @param array $response * @return void */ public function logRequestSuccess(array $request, array $response): void { $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; $uri = $this->addPortInUrl($response['effective_url'], (int) $port); $this->log->debug('Request Body', array($request['body'])); $this->log->info( 'Request Success:', array( 'method' => $request['http_method'], 'uri' => $uri, 'port' => $port, 'headers' => $request['headers'], 'HTTP code' => $response['status'], 'duration' => $response['transfer_stats']['total_time'], ) ); $this->log->debug('Response', array($response['body'])); // Build the curl command for Trace. $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']); $this->trace->info($curlCommand); $this->trace->debug( 'Response:', array( 'response' => $response['body'], 'method' => $request['http_method'], 'uri' => $uri, 'port' => $port, 'HTTP code' => $response['status'], 'duration' => $response['transfer_stats']['total_time'], ) ); } /** * Log a failed request * * @param array $request * @param array $response * @param \Exception $exception * * @return void */ public function logRequestFail(array $request, array $response, \Exception $exception): void { $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; $uri = $this->addPortInUrl($response['effective_url'], (int) $port); $this->log->debug('Request Body', array($request['body'])); $this->log->warning( 'Request Failure:', array( 'method' => $request['http_method'], 'uri' => $uri, 'port' => $port, 'headers' => $request['headers'], 'HTTP code' => $response['status'], 'duration' => $response['transfer_stats']['total_time'], 'error' => $exception->getMessage(), ) ); $this->log->warning('Response', array($response['body'])); // Build the curl command for Trace. $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']); $this->trace->info($curlCommand); $this->trace->debug( 'Response:', array( 'response' => $response, 'method' => $request['http_method'], 'uri' => $uri, 'port' => $port, 'HTTP code' => $response['status'], 'duration' => $response['transfer_stats']['total_time'], ) ); } public function ping(): bool { $options = [ 'client' => [ 'timeout' => $this->pingTimeout, 'never_retry' => true, 'verbose' => true ] ]; try { $response = $this->performRequest('HEAD', '/', null, null, $options); $response = $response->wait(); } catch (TransportException $exception) { $this->markDead(); return false; } if ($response['status'] === 200) { $this->markAlive(); return true; } else { $this->markDead(); return false; } } /** * @return array|\GuzzleHttp\Ring\Future\FutureArray */ public function sniff() { $options = [ 'client' => [ 'timeout' => $this->pingTimeout, 'never_retry' => true ] ]; return $this->performRequest('GET', '/_nodes/', null, null, $options); } public function isAlive(): bool { return $this->isAlive; } public function markAlive(): void { $this->failedPings = 0; $this->isAlive = true; $this->lastPing = time(); } public function markDead(): void { $this->isAlive = false; $this->failedPings += 1; $this->lastPing = time(); } public function getLastPing(): int { return $this->lastPing; } public function getPingFailures(): int { return $this->failedPings; } public function getHost(): string { return $this->host; } public function getUserPass(): ?string { return $this->connectionParams['client']['curl'][CURLOPT_USERPWD] ?? null; } public function getPath(): ?string { return $this->path; } /** * @return int */ public function getPort() { return $this->port; } protected function getCurlRetryException(array $request, array $response): ElasticsearchException { $exception = null; $message = $response['error']->getMessage(); $exception = new MaxRetriesException($message); switch ($response['curl']['errno']) { case 6: $exception = new CouldNotResolveHostException($message, 0, $exception); break; case 7: $exception = new CouldNotConnectToHost($message, 0, $exception); break; case 28: $exception = new OperationTimeoutException($message, 0, $exception); break; } return $exception; } /** * Get the x-elastic-client-meta header * * The header format is specified by the following regex: * ^[a-z]{1,}=[a-z0-9\.\-]{1,}(?:,[a-z]{1,}=[a-z0-9\.\-]+)*$ */ private function getElasticMetaHeader(array $connectionParams): string { $phpSemVersion = sprintf("%d.%d.%d", PHP_MAJOR_VERSION, PHP_MINOR_VERSION, PHP_RELEASE_VERSION); // Reduce the size in case of '-snapshot' version $clientVersion = str_replace('-snapshot', '-s', strtolower(Client::VERSION)); $clientMeta = sprintf( "es=%s,php=%s,t=%s,a=%d", $clientVersion, $phpSemVersion, $clientVersion, isset($connectionParams['client']['future']) && $connectionParams['client']['future'] === 'lazy' ? 1 : 0 ); if (function_exists('curl_version')) { $curlVersion = curl_version(); if (isset($curlVersion['version'])) { $clientMeta .= sprintf(",cu=%s", $curlVersion['version']); // cu = curl library } } return $clientMeta; } /** * Get the OS version using php_uname if available * otherwise it returns an empty string * * @see https://github.com/elastic/elasticsearch-php/issues/922 */ private function getOSVersion(): string { if ($this->OSVersion === null) { $this->OSVersion = strpos(strtolower(ini_get('disable_functions')), 'php_uname') !== false ? '' : php_uname("r"); } return $this->OSVersion; } /** * Add the port value in the URL if not present */ private function addPortInUrl(string $uri, int $port): string { if (strpos($uri, ':', 7) !== false) { return $uri; } return preg_replace('#([^/])/([^/])#', sprintf("$1:%s/$2", $port), $uri, 1); } /** * Construct a string cURL command */ private function buildCurlCommand(string $method, string $url, ?string $body): string { if (strpos($url, '?') === false) { $url .= '?pretty=true'; } else { str_replace('?', '?pretty=true', $url); } $curlCommand = 'curl -X' . strtoupper($method); $curlCommand .= " '" . $url . "'"; if (isset($body) === true && $body !== '') { $curlCommand .= " -d '" . $body . "'"; } return $curlCommand; } private function process4xxError(array $request, array $response, array $ignore): ?ElasticsearchException { $statusCode = $response['status']; /** * @var \Exception $exception */ $exception = $this->tryDeserialize400Error($response); if (array_search($response['status'], $ignore) !== false) { return null; } $responseBody = $this->convertBodyToString($response['body'], $statusCode, $exception); if ($statusCode === 401) { $exception = new Unauthorized401Exception($responseBody, $statusCode); } elseif ($statusCode === 403) { $exception = new Forbidden403Exception($responseBody, $statusCode); } elseif ($statusCode === 404) { $exception = new Missing404Exception($responseBody, $statusCode); } elseif ($statusCode === 409) { $exception = new Conflict409Exception($responseBody, $statusCode); } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) { $exception = new ScriptLangNotSupportedException($responseBody. $statusCode); } elseif ($statusCode === 408) { $exception = new RequestTimeout408Exception($responseBody, $statusCode); } else { $exception = new BadRequest400Exception($responseBody, $statusCode); } $this->logRequestFail($request, $response, $exception); throw $exception; } private function process5xxError(array $request, array $response, array $ignore): ?ElasticsearchException { $statusCode = (int) $response['status']; $responseBody = $response['body']; /** * @var \Exception $exception */ $exception = $this->tryDeserialize500Error($response); $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage(); $this->log->error($exceptionText); $this->log->error($exception->getTraceAsString()); if (array_search($statusCode, $ignore) !== false) { return null; } if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) { $exception = new RoutingMissingException($exception->getMessage(), $statusCode, $exception); } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) { $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode, $exception); } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) { $exception = new NoShardAvailableException($exception->getMessage(), $statusCode, $exception); } else { $exception = new ServerErrorResponseException( $this->convertBodyToString($responseBody, $statusCode, $exception), $statusCode ); } $this->logRequestFail($request, $response, $exception); throw $exception; } private function convertBodyToString($body, int $statusCode, Exception $exception) : string { if (empty($body)) { return sprintf( "Unknown %d error from Elasticsearch %s", $statusCode, $exception->getMessage() ); } // if body is not string, we convert it so it can be used as Exception message if (!is_string($body)) { return json_encode($body); } return $body; } private function tryDeserialize400Error(array $response): ElasticsearchException { return $this->tryDeserializeError($response, BadRequest400Exception::class); } private function tryDeserialize500Error(array $response): ElasticsearchException { return $this->tryDeserializeError($response, ServerErrorResponseException::class); } private function tryDeserializeError(array $response, string $errorClass): ElasticsearchException { $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']); if (is_array($error) === true) { if (isset($error['error']) === false) { // <2.0 "i just blew up" nonstructured exception // $error is an array but we don't know the format, reuse the response body instead // added json_encode to convert into a string return new $errorClass(json_encode($response['body']), (int) $response['status']); } // 2.0 structured exceptions if (is_array($error['error']) && array_key_exists('reason', $error['error']) === true) { // Try to use root cause first (only grabs the first root cause) $root = $error['error']['root_cause']; if (isset($root) && isset($root[0])) { $cause = $root[0]['reason']; $type = $root[0]['type']; } else { $cause = $error['error']['reason']; $type = $error['error']['type']; } // added json_encode to convert into a string $original = new $errorClass(json_encode($response['body']), $response['status']); return new $errorClass("$type: $cause", (int) $response['status'], $original); } // <2.0 semi-structured exceptions // added json_encode to convert into a string $original = new $errorClass(json_encode($response['body']), $response['status']); $errorEncoded = $error['error']; if (is_array($errorEncoded)) { $errorEncoded = json_encode($errorEncoded); } return new $errorClass($errorEncoded, (int) $response['status'], $original); } // if responseBody is not string, we convert it so it can be used as Exception message $responseBody = $response['body']; if (!is_string($responseBody)) { $responseBody = json_encode($responseBody); } // <2.0 "i just blew up" nonstructured exception return new $errorClass($responseBody); } }