1<?php
2
3declare(strict_types = 1);
4
5namespace Elasticsearch;
6
7use Elasticsearch\Common\Exceptions;
8use Elasticsearch\ConnectionPool\AbstractConnectionPool;
9use Elasticsearch\Connections\Connection;
10use Elasticsearch\Connections\ConnectionInterface;
11use GuzzleHttp\Ring\Future\FutureArrayInterface;
12use Psr\Log\LoggerInterface;
13
14/**
15 * Class Transport
16 *
17 * @category Elasticsearch
18 * @package  Elasticsearch
19 * @author   Zachary Tong <zach@elastic.co>
20 * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache2
21 * @link     http://elastic.co
22 */
23class Transport
24{
25    /**
26     * @var AbstractConnectionPool
27     */
28    public $connectionPool;
29
30    /**
31     * @var LoggerInterface
32     */
33    private $log;
34
35    /**
36     * @var int
37     */
38    public $retryAttempts = 0;
39
40    /**
41     * @var Connection
42     */
43    public $lastConnection;
44
45    /**
46     * @var int
47     */
48    public $retries;
49
50    /**
51     * Transport class is responsible for dispatching requests to the
52     * underlying cluster connections
53     *
54     * @param int                                   $retries
55     * @param bool                                  $sniffOnStart
56     * @param ConnectionPool\AbstractConnectionPool $connectionPool
57     * @param \Psr\Log\LoggerInterface              $log            Monolog logger object
58     */
59    public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false)
60    {
61        $this->log            = $log;
62        $this->connectionPool = $connectionPool;
63        $this->retries        = $retries;
64
65        if ($sniffOnStart === true) {
66            $this->log->notice('Sniff on Start.');
67            $this->connectionPool->scheduleCheck();
68        }
69    }
70
71    /**
72     * Returns a single connection from the connection pool
73     * Potentially performs a sniffing step before returning
74     */
75    public function getConnection(): ConnectionInterface
76    {
77        return $this->connectionPool->nextConnection();
78    }
79
80    /**
81     * Perform a request to the Cluster
82     *
83     * @param string $method  HTTP method to use
84     * @param string $uri     HTTP URI to send request to
85     * @param array  $params  Optional query parameters
86     * @param null   $body    Optional query body
87     * @param array  $options
88     *
89     * @throws Common\Exceptions\NoNodesAvailableException|\Exception
90     */
91    public function performRequest(string $method, string $uri, array $params = null, $body = null, array $options = []): FutureArrayInterface
92    {
93        try {
94            $connection  = $this->getConnection();
95        } catch (Exceptions\NoNodesAvailableException $exception) {
96            $this->log->critical('No alive nodes found in cluster');
97            throw $exception;
98        }
99
100        $response             = [];
101        $caughtException      = null;
102        $this->lastConnection = $connection;
103
104        $future = $connection->performRequest(
105            $method,
106            $uri,
107            $params,
108            $body,
109            $options,
110            $this
111        );
112
113        $future->promise()->then(
114            //onSuccess
115            function ($response) {
116                $this->retryAttempts = 0;
117                // Note, this could be a 4xx or 5xx error
118            },
119            //onFailure
120            function ($response) {
121                // Ignore 400 level errors, as that means the server responded just fine
122                if (!(isset($response['code']) && $response['code'] >=400 && $response['code'] < 500)) {
123                    // Otherwise schedule a check
124                    $this->connectionPool->scheduleCheck();
125                }
126            }
127        );
128
129        return $future;
130    }
131
132    /**
133     * @param FutureArrayInterface $result  Response of a request (promise)
134     * @param array                $options Options for transport
135     *
136     * @return callable|array
137     */
138    public function resultOrFuture(FutureArrayInterface $result, array $options = [])
139    {
140        $response = null;
141        $async = isset($options['client']['future']) ? $options['client']['future'] : null;
142        if (is_null($async) || $async === false) {
143            do {
144                $result = $result->wait();
145            } while ($result instanceof FutureArrayInterface);
146
147            return $result;
148        } elseif ($async === true || $async === 'lazy') {
149            return $result;
150        }
151    }
152
153    public function shouldRetry(array $request): bool
154    {
155        if ($this->retryAttempts < $this->retries) {
156            $this->retryAttempts += 1;
157
158            return true;
159        }
160
161        return false;
162    }
163
164    /**
165     * Returns the last used connection so that it may be inspected.  Mainly
166     * for debugging/testing purposes.
167     */
168    public function getLastConnection(): ConnectionInterface
169    {
170        return $this->lastConnection;
171    }
172}
173