1<?php
2
3declare(strict_types = 1);
4
5namespace Elasticsearch\Connections;
6
7use Elasticsearch\Client;
8use Elasticsearch\Common\Exceptions\AlreadyExpiredException;
9use Elasticsearch\Common\Exceptions\BadRequest400Exception;
10use Elasticsearch\Common\Exceptions\Conflict409Exception;
11use Elasticsearch\Common\Exceptions\Curl\CouldNotConnectToHost;
12use Elasticsearch\Common\Exceptions\Curl\CouldNotResolveHostException;
13use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException;
14use Elasticsearch\Common\Exceptions\ElasticsearchException;
15use Elasticsearch\Common\Exceptions\Forbidden403Exception;
16use Elasticsearch\Common\Exceptions\MaxRetriesException;
17use Elasticsearch\Common\Exceptions\Missing404Exception;
18use Elasticsearch\Common\Exceptions\NoDocumentsToGetException;
19use Elasticsearch\Common\Exceptions\NoShardAvailableException;
20use Elasticsearch\Common\Exceptions\RequestTimeout408Exception;
21use Elasticsearch\Common\Exceptions\RoutingMissingException;
22use Elasticsearch\Common\Exceptions\ScriptLangNotSupportedException;
23use Elasticsearch\Common\Exceptions\ServerErrorResponseException;
24use Elasticsearch\Common\Exceptions\TransportException;
25use Elasticsearch\Serializers\SerializerInterface;
26use Elasticsearch\Transport;
27use GuzzleHttp\Ring\Core;
28use GuzzleHttp\Ring\Exception\ConnectException;
29use GuzzleHttp\Ring\Exception\RingException;
30use Psr\Log\LoggerInterface;
31
32/**
33 * Class AbstractConnection
34 *
35 * @category Elasticsearch
36 * @package  Elasticsearch\Connections
37 * @author   Zachary Tong <zach@elastic.co>
38 * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache2
39 * @link     http://elastic.co
40 */
41class Connection implements ConnectionInterface
42{
43    /**
44     * @var callable
45     */
46    protected $handler;
47
48    /**
49     * @var SerializerInterface
50     */
51    protected $serializer;
52
53    /**
54     * @var string
55     */
56    protected $transportSchema = 'http';    // TODO depreciate this default
57
58    /**
59     * @var string
60     */
61    protected $host;
62
63    /**
64     * @var string|null
65     */
66    protected $path;
67
68    /**
69    * @var int
70    */
71    protected $port;
72
73    /**
74     * @var LoggerInterface
75     */
76    protected $log;
77
78    /**
79     * @var LoggerInterface
80     */
81    protected $trace;
82
83    /**
84     * @var array
85     */
86    protected $connectionParams;
87
88    /**
89     * @var array
90     */
91    protected $headers = [];
92
93    /**
94     * @var bool
95     */
96    protected $isAlive = false;
97
98    /**
99     * @var float
100     */
101    private $pingTimeout = 1;    //TODO expose this
102
103    /**
104     * @var int
105     */
106    private $lastPing = 0;
107
108    /**
109     * @var int
110     */
111    private $failedPings = 0;
112
113    private $lastRequest = array();
114
115    public function __construct(
116        callable $handler,
117        array $hostDetails,
118        array $connectionParams,
119        SerializerInterface $serializer,
120        LoggerInterface $log,
121        LoggerInterface $trace
122    ) {
123
124        if (isset($hostDetails['port']) !== true) {
125            $hostDetails['port'] = 9200;
126        }
127
128        if (isset($hostDetails['scheme'])) {
129            $this->transportSchema = $hostDetails['scheme'];
130        }
131
132        if (isset($hostDetails['user']) && isset($hostDetails['pass'])) {
133            $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC;
134            $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass'];
135        }
136
137        $connectionParams['client']['curl'][CURLOPT_PORT] = $hostDetails['port'];
138
139        if (isset($connectionParams['client']['headers'])) {
140            $this->headers = $connectionParams['client']['headers'];
141            unset($connectionParams['client']['headers']);
142        }
143
144        // Add the User-Agent using the format: <client-repo-name>/<client-version> (metadata-values)
145        $this->headers['User-Agent'] = [sprintf(
146            "elasticsearch-php/%s (%s %s; PHP %s)",
147            Client::VERSION,
148            php_uname("s"),
149            php_uname("r"),
150            phpversion()
151        )];
152
153        $host = $hostDetails['host'];
154        $path = null;
155        if (isset($hostDetails['path']) === true) {
156            $path = $hostDetails['path'];
157        }
158        $port = $hostDetails['port'];
159
160        $this->host             = $host;
161        $this->path             = $path;
162        $this->port             = $port;
163        $this->log              = $log;
164        $this->trace            = $trace;
165        $this->connectionParams = $connectionParams;
166        $this->serializer       = $serializer;
167
168        $this->handler = $this->wrapHandler($handler);
169    }
170
171    /**
172     * @param  string    $method
173     * @param  string    $uri
174     * @param  array     $params
175     * @param  null      $body
176     * @param  array     $options
177     * @param  Transport $transport
178     * @return mixed
179     */
180    public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], Transport $transport = null)
181    {
182        if ($body !== null) {
183            $body = $this->serializer->serialize($body);
184        }
185
186        if (isset($options['client']['headers']) && is_array($options['client']['headers'])) {
187            $this->headers = array_merge($this->headers, $options['client']['headers']);
188        }
189
190        $request = [
191            'http_method' => $method,
192            'scheme'      => $this->transportSchema,
193            'uri'         => $this->getURI($uri, $params),
194            'body'        => $body,
195            'headers'     => array_merge(
196                [
197                'Host'  => [$this->host]
198                ],
199                $this->headers
200            )
201        ];
202
203        $request = array_replace_recursive($request, $this->connectionParams, $options);
204
205        // RingPHP does not like if client is empty
206        if (empty($request['client'])) {
207            unset($request['client']);
208        }
209
210        $handler = $this->handler;
211        $future = $handler($request, $this, $transport, $options);
212
213        return $future;
214    }
215
216    public function getTransportSchema(): string
217    {
218        return $this->transportSchema;
219    }
220
221    public function getLastRequestInfo(): array
222    {
223        return $this->lastRequest;
224    }
225
226    private function wrapHandler(callable $handler): callable
227    {
228        return function (array $request, Connection $connection, Transport $transport = null, $options) use ($handler) {
229
230            $this->lastRequest = [];
231            $this->lastRequest['request'] = $request;
232
233            // Send the request using the wrapped handler.
234            $response =  Core::proxy($handler($request), function ($response) use ($connection, $transport, $request, $options) {
235
236                $this->lastRequest['response'] = $response;
237
238                if (isset($response['error']) === true) {
239                    if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) {
240                        $this->log->warning("Curl exception encountered.");
241
242                        $exception = $this->getCurlRetryException($request, $response);
243
244                        $this->logRequestFail($request, $response, $exception);
245
246                        $node = $connection->getHost();
247                        $this->log->warning("Marking node $node dead.");
248                        $connection->markDead();
249
250                        // If the transport has not been set, we are inside a Ping or Sniff,
251                        // so we don't want to retrigger retries anyway.
252                        //
253                        // TODO this could be handled better, but we are limited because connectionpools do not
254                        // have access to Transport.  Architecturally, all of this needs to be refactored
255                        if (isset($transport) === true) {
256                            $transport->connectionPool->scheduleCheck();
257
258                            $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false;
259                            $shouldRetry = $transport->shouldRetry($request);
260                            $shouldRetryText = ($shouldRetry) ? 'true' : 'false';
261
262                            $this->log->warning("Retries left? $shouldRetryText");
263                            if ($shouldRetry && !$neverRetry) {
264                                return $transport->performRequest(
265                                    $request['http_method'],
266                                    $request['uri'],
267                                    [],
268                                    $request['body'],
269                                    $options
270                                );
271                            }
272                        }
273
274                        $this->log->warning("Out of retries, throwing exception from $node");
275                        // Only throw if we run out of retries
276                        throw $exception;
277                    } else {
278                        // Something went seriously wrong, bail
279                        $exception = new TransportException($response['error']->getMessage());
280                        $this->logRequestFail($request, $response, $exception);
281                        throw $exception;
282                    }
283                } else {
284                    $connection->markAlive();
285
286                    if (isset($response['body']) === true) {
287                        $response['body'] = stream_get_contents($response['body']);
288                        $this->lastRequest['response']['body'] = $response['body'];
289                    }
290
291                    if ($response['status'] >= 400 && $response['status'] < 500) {
292                        $ignore = $request['client']['ignore'] ?? [];
293                        // Skip 404 if succeeded true in the body (e.g. clear_scroll)
294                        $body = $response['body'] ?? '';
295                        if (strpos($body, '"succeeded":true') !== false) {
296                             $ignore[] = 404;
297                        }
298                        $this->process4xxError($request, $response, $ignore);
299                    } elseif ($response['status'] >= 500) {
300                        $ignore = $request['client']['ignore'] ?? [];
301                        $this->process5xxError($request, $response, $ignore);
302                    }
303
304                    // No error, deserialize
305                    $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
306                }
307                $this->logRequestSuccess($request, $response);
308
309                return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body'];
310            });
311
312            return $response;
313        };
314    }
315
316    private function getURI(string $uri, ?array $params): string
317    {
318        if (isset($params) === true && !empty($params)) {
319            array_walk(
320                $params,
321                function (&$value, &$key) {
322                    if ($value === true) {
323                        $value = 'true';
324                    } elseif ($value === false) {
325                        $value = 'false';
326                    }
327                }
328            );
329
330            $uri .= '?' . http_build_query($params);
331        }
332
333        if ($this->path !== null) {
334            $uri = $this->path . $uri;
335        }
336
337        return $uri ?? '';
338    }
339
340    public function getHeaders(): array
341    {
342        return $this->headers;
343    }
344
345    /**
346     * Log a successful request
347     *
348     * @param array $request
349     * @param array $response
350     * @return void
351     */
352    public function logRequestSuccess(array $request, array $response): void
353    {
354        $this->log->debug('Request Body', array($request['body']));
355        $this->log->info(
356            'Request Success:',
357            array(
358                'method'    => $request['http_method'],
359                'uri'       => $response['effective_url'],
360                'headers'   => $request['headers'],
361                'HTTP code' => $response['status'],
362                'duration'  => $response['transfer_stats']['total_time'],
363            )
364        );
365        $this->log->debug('Response', array($response['body']));
366
367        // Build the curl command for Trace.
368        $curlCommand = $this->buildCurlCommand($request['http_method'], $response['effective_url'], $request['body']);
369        $this->trace->info($curlCommand);
370        $this->trace->debug(
371            'Response:',
372            array(
373                'response'  => $response['body'],
374                'method'    => $request['http_method'],
375                'uri'       => $response['effective_url'],
376                'HTTP code' => $response['status'],
377                'duration'  => $response['transfer_stats']['total_time'],
378            )
379        );
380    }
381
382    /**
383     * Log a failed request
384     *
385     * @param array $request
386     * @param array $response
387     * @param \Exception $exception
388     *
389     * @return void
390     */
391    public function logRequestFail(array $request, array $response, \Exception $exception): void
392    {
393        $this->log->debug('Request Body', array($request['body']));
394        $this->log->warning(
395            'Request Failure:',
396            array(
397                'method'    => $request['http_method'],
398                'uri'       => $response['effective_url'],
399                'headers'   => $request['headers'],
400                'HTTP code' => $response['status'],
401                'duration'  => $response['transfer_stats']['total_time'],
402                'error'     => $exception->getMessage(),
403            )
404        );
405        $this->log->warning('Response', array($response['body']));
406
407        // Build the curl command for Trace.
408        $curlCommand = $this->buildCurlCommand($request['http_method'], $response['effective_url'], $request['body']);
409        $this->trace->info($curlCommand);
410        $this->trace->debug(
411            'Response:',
412            array(
413                'response'  => $response,
414                'method'    => $request['http_method'],
415                'uri'       => $response['effective_url'],
416                'HTTP code' => $response['status'],
417                'duration'  => $response['transfer_stats']['total_time'],
418            )
419        );
420    }
421
422    public function ping(): bool
423    {
424        $options = [
425            'client' => [
426                'timeout' => $this->pingTimeout,
427                'never_retry' => true,
428                'verbose' => true
429            ]
430        ];
431        try {
432            $response = $this->performRequest('HEAD', '/', null, null, $options);
433            $response = $response->wait();
434        } catch (TransportException $exception) {
435            $this->markDead();
436
437            return false;
438        }
439
440        if ($response['status'] === 200) {
441            $this->markAlive();
442
443            return true;
444        } else {
445            $this->markDead();
446
447            return false;
448        }
449    }
450
451    /**
452     * @return array|\GuzzleHttp\Ring\Future\FutureArray
453     */
454    public function sniff()
455    {
456        $options = [
457            'client' => [
458                'timeout' => $this->pingTimeout,
459                'never_retry' => true
460            ]
461        ];
462
463        return $this->performRequest('GET', '/_nodes/', null, null, $options);
464    }
465
466    public function isAlive(): bool
467    {
468        return $this->isAlive;
469    }
470
471    public function markAlive(): void
472    {
473        $this->failedPings = 0;
474        $this->isAlive = true;
475        $this->lastPing = time();
476    }
477
478    public function markDead(): void
479    {
480        $this->isAlive = false;
481        $this->failedPings += 1;
482        $this->lastPing = time();
483    }
484
485    public function getLastPing(): int
486    {
487        return $this->lastPing;
488    }
489
490    public function getPingFailures(): int
491    {
492        return $this->failedPings;
493    }
494
495    public function getHost(): string
496    {
497        return $this->host;
498    }
499
500    public function getUserPass(): ?string
501    {
502        return $this->connectionParams['client']['curl'][CURLOPT_USERPWD] ?? null;
503    }
504
505    public function getPath(): ?string
506    {
507        return $this->path;
508    }
509
510    /**
511     * @return int
512     */
513    public function getPort()
514    {
515        return $this->port;
516    }
517
518    protected function getCurlRetryException(array $request, array $response): ElasticsearchException
519    {
520        $exception = null;
521        $message = $response['error']->getMessage();
522        $exception = new MaxRetriesException($message);
523        switch ($response['curl']['errno']) {
524            case 6:
525                $exception = new CouldNotResolveHostException($message, 0, $exception);
526                break;
527            case 7:
528                $exception = new CouldNotConnectToHost($message, 0, $exception);
529                break;
530            case 28:
531                $exception = new OperationTimeoutException($message, 0, $exception);
532                break;
533        }
534
535        return $exception;
536    }
537
538    /**
539     * Construct a string cURL command
540     */
541    private function buildCurlCommand(string $method, string $uri, ?string $body): string
542    {
543        if (strpos($uri, '?') === false) {
544            $uri .= '?pretty=true';
545        } else {
546            str_replace('?', '?pretty=true', $uri);
547        }
548
549        $curlCommand = 'curl -X' . strtoupper($method);
550        $curlCommand .= " '" . $uri . "'";
551
552        if (isset($body) === true && $body !== '') {
553            $curlCommand .= " -d '" . $body . "'";
554        }
555
556        return $curlCommand;
557    }
558
559    private function process4xxError(array $request, array $response, array $ignore): ?ElasticsearchException
560    {
561        $statusCode = $response['status'];
562        $responseBody = $response['body'];
563
564        /**
565 * @var \Exception $exception
566*/
567        $exception = $this->tryDeserialize400Error($response);
568
569        if (array_search($response['status'], $ignore) !== false) {
570            return null;
571        }
572
573        // if responseBody is not string, we convert it so it can be used as Exception message
574        if (!is_string($responseBody)) {
575            $responseBody = json_encode($responseBody);
576        }
577
578        if ($statusCode === 400 && strpos($responseBody, "AlreadyExpiredException") !== false) {
579            $exception = new AlreadyExpiredException($responseBody, $statusCode);
580        } elseif ($statusCode === 403) {
581            $exception = new Forbidden403Exception($responseBody, $statusCode);
582        } elseif ($statusCode === 404) {
583            $exception = new Missing404Exception($responseBody, $statusCode);
584        } elseif ($statusCode === 409) {
585            $exception = new Conflict409Exception($responseBody, $statusCode);
586        } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) {
587            $exception = new ScriptLangNotSupportedException($responseBody. $statusCode);
588        } elseif ($statusCode === 408) {
589            $exception = new RequestTimeout408Exception($responseBody, $statusCode);
590        } else {
591            $exception = new BadRequest400Exception($responseBody, $statusCode);
592        }
593
594        $this->logRequestFail($request, $response, $exception);
595
596        throw $exception;
597    }
598
599    private function process5xxError(array $request, array $response, array $ignore): ?ElasticsearchException
600    {
601        $statusCode = (int) $response['status'];
602        $responseBody = $response['body'];
603
604        /**
605 * @var \Exception $exception
606*/
607        $exception = $this->tryDeserialize500Error($response);
608
609        $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage();
610        $this->log->error($exceptionText);
611        $this->log->error($exception->getTraceAsString());
612
613        if (array_search($statusCode, $ignore) !== false) {
614            return null;
615        }
616
617        if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) {
618            $exception = new RoutingMissingException($exception->getMessage(), $statusCode, $exception);
619        } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) {
620            $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode, $exception);
621        } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) {
622            $exception = new NoShardAvailableException($exception->getMessage(), $statusCode, $exception);
623        } else {
624            $exception = new ServerErrorResponseException($responseBody, $statusCode);
625        }
626
627        $this->logRequestFail($request, $response, $exception);
628
629        throw $exception;
630    }
631
632    private function tryDeserialize400Error(array $response): ElasticsearchException
633    {
634        return $this->tryDeserializeError($response, BadRequest400Exception::class);
635    }
636
637    private function tryDeserialize500Error(array $response): ElasticsearchException
638    {
639        return $this->tryDeserializeError($response, ServerErrorResponseException::class);
640    }
641
642    private function tryDeserializeError(array $response, string $errorClass): ElasticsearchException
643    {
644        $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
645        if (is_array($error) === true) {
646            // 2.0 structured exceptions
647            if (isset($error['error']['reason']) === true) {
648                // Try to use root cause first (only grabs the first root cause)
649                $root = $error['error']['root_cause'];
650                if (isset($root) && isset($root[0])) {
651                    $cause = $root[0]['reason'];
652                    $type = $root[0]['type'];
653                } else {
654                    $cause = $error['error']['reason'];
655                    $type = $error['error']['type'];
656                }
657                // added json_encode to convert into a string
658                $original = new $errorClass(json_encode($response['body']), $response['status']);
659
660                return new $errorClass("$type: $cause", (int) $response['status'], $original);
661            } elseif (isset($error['error']) === true) {
662                // <2.0 semi-structured exceptions
663                // added json_encode to convert into a string
664                $original = new $errorClass(json_encode($response['body']), $response['status']);
665
666                return new $errorClass($error['error'], (int) $response['status'], $original);
667            }
668
669            // <2.0 "i just blew up" nonstructured exception
670            // $error is an array but we don't know the format, reuse the response body instead
671            // added json_encode to convert into a string
672            return new $errorClass(json_encode($response['body']), (int) $response['status']);
673        }
674
675        // if responseBody is not string, we convert it so it can be used as Exception message
676        $responseBody = $response['body'];
677        if (!is_string($responseBody)) {
678            $responseBody = json_encode($responseBody);
679        }
680
681        // <2.0 "i just blew up" nonstructured exception
682        return new $errorClass($responseBody);
683    }
684}
685