1<?php
2/**
3 * Elasticsearch PHP client
4 *
5 * @link      https://github.com/elastic/elasticsearch-php/
6 * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
7 * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
8 * @license   https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
9 *
10 * Licensed to Elasticsearch B.V under one or more agreements.
11 * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
12 * the GNU Lesser General Public License, Version 2.1, at your option.
13 * See the LICENSE file in the project root for more information.
14 */
15
16
17declare(strict_types = 1);
18
19namespace Elasticsearch;
20
21use Elasticsearch\Common\Exceptions;
22use Elasticsearch\ConnectionPool\AbstractConnectionPool;
23use Elasticsearch\Connections\Connection;
24use Elasticsearch\Connections\ConnectionInterface;
25use GuzzleHttp\Ring\Future\FutureArrayInterface;
26use Psr\Log\LoggerInterface;
27
28class Transport
29{
30    /**
31     * @var AbstractConnectionPool
32     */
33    public $connectionPool;
34
35    /**
36     * @var LoggerInterface
37     */
38    private $log;
39
40    /**
41     * @var int
42     */
43    public $retryAttempts = 0;
44
45    /**
46     * @var Connection
47     */
48    public $lastConnection;
49
50    /**
51     * @var int
52     */
53    public $retries;
54
55    /**
56     * Transport class is responsible for dispatching requests to the
57     * underlying cluster connections
58     *
59     * @param int                                   $retries
60     * @param bool                                  $sniffOnStart
61     * @param ConnectionPool\AbstractConnectionPool $connectionPool
62     * @param \Psr\Log\LoggerInterface              $log            Monolog logger object
63     */
64    public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false)
65    {
66        $this->log            = $log;
67        $this->connectionPool = $connectionPool;
68        $this->retries        = $retries;
69
70        if ($sniffOnStart === true) {
71            $this->log->notice('Sniff on Start.');
72            $this->connectionPool->scheduleCheck();
73        }
74    }
75
76    /**
77     * Returns a single connection from the connection pool
78     * Potentially performs a sniffing step before returning
79     */
80    public function getConnection(): ConnectionInterface
81    {
82        return $this->connectionPool->nextConnection();
83    }
84
85    /**
86     * Perform a request to the Cluster
87     *
88     * @param string $method  HTTP method to use
89     * @param string $uri     HTTP URI to send request to
90     * @param array  $params  Optional query parameters
91     * @param null   $body    Optional query body
92     * @param array  $options
93     *
94     * @throws Common\Exceptions\NoNodesAvailableException|\Exception
95     */
96    public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface
97    {
98        try {
99            $connection  = $this->getConnection();
100        } catch (Exceptions\NoNodesAvailableException $exception) {
101            $this->log->critical('No alive nodes found in cluster');
102            throw $exception;
103        }
104
105        $response             = [];
106        $caughtException      = null;
107        $this->lastConnection = $connection;
108
109        $future = $connection->performRequest(
110            $method,
111            $uri,
112            $params,
113            $body,
114            $options,
115            $this
116        );
117
118        $future->promise()->then(
119            //onSuccess
120            function ($response) {
121                $this->retryAttempts = 0;
122                // Note, this could be a 4xx or 5xx error
123            },
124            //onFailure
125            function ($response) {
126                $code = $response->getCode();
127                // Ignore 400 level errors, as that means the server responded just fine
128                if ($code < 400 || $code >= 500) {
129                    // Otherwise schedule a check
130                    $this->connectionPool->scheduleCheck();
131                }
132            }
133        );
134
135        return $future;
136    }
137
138    /**
139     * @param FutureArrayInterface $result  Response of a request (promise)
140     * @param array                $options Options for transport
141     *
142     * @return callable|array
143     */
144    public function resultOrFuture(FutureArrayInterface $result, array $options = [])
145    {
146        $response = null;
147        $async = isset($options['client']['future']) ? $options['client']['future'] : null;
148        if (is_null($async) || $async === false) {
149            do {
150                $result = $result->wait();
151            } while ($result instanceof FutureArrayInterface);
152        }
153        return $result;
154    }
155
156    public function shouldRetry(array $request): bool
157    {
158        if ($this->retryAttempts < $this->retries) {
159            $this->retryAttempts += 1;
160
161            return true;
162        }
163
164        return false;
165    }
166
167    /**
168     * Returns the last used connection so that it may be inspected.  Mainly
169     * for debugging/testing purposes.
170     */
171    public function getLastConnection(): ConnectionInterface
172    {
173        return $this->lastConnection;
174    }
175}
176