1<?php
2
3namespace Elastica\Transport;
4
5use Elastica\Connection;
6use Elastica\Exception\Connection\GuzzleException;
7use Elastica\Exception\PartialShardFailureException;
8use Elastica\Exception\ResponseException;
9use Elastica\JSON;
10use Elastica\Request;
11use Elastica\Response;
12use Elastica\Util;
13use GuzzleHttp\Client;
14use GuzzleHttp\Exception\TransferException;
15use GuzzleHttp\Psr7;
16use GuzzleHttp\Psr7\Uri;
17use GuzzleHttp\RequestOptions;
18use Psr\Http\Message\StreamInterface;
19
20/**
21 * Elastica Guzzle Transport object.
22 *
23 * @author Milan Magudia <milan@magudia.com>
24 */
25class Guzzle extends AbstractTransport
26{
27    /**
28     * Http scheme.
29     *
30     * @var string Http scheme
31     */
32    protected $_scheme = 'http';
33
34    /**
35     * Curl resource to reuse.
36     *
37     * @var Client|null Guzzle client to reuse
38     */
39    protected static $_guzzleClientConnection;
40
41    /**
42     * Makes calls to the elasticsearch server.
43     *
44     * All calls that are made to the server are done through this function
45     *
46     * @throws \Elastica\Exception\ConnectionException
47     * @throws ResponseException
48     * @throws \Elastica\Exception\Connection\HttpException
49     */
50    public function exec(Request $request, array $params): Response
51    {
52        $connection = $this->getConnection();
53
54        $client = $this->_getGuzzleClient($connection->isPersistent());
55
56        $options = [
57            'base_uri' => $this->_getBaseUrl($connection),
58            RequestOptions::HEADERS => [
59                'Content-Type' => $request->getContentType(),
60            ],
61            RequestOptions::HTTP_ERRORS => false, // 4xx and 5xx is expected and NOT an exceptions in this context
62        ];
63
64        if ($connection->getTimeout()) {
65            $options[RequestOptions::TIMEOUT] = $connection->getTimeout();
66        }
67
68        if (null !== $proxy = $connection->getProxy()) {
69            $options[RequestOptions::PROXY] = $proxy;
70        }
71
72        $req = $this->_createPsr7Request($request, $connection);
73
74        try {
75            $start = \microtime(true);
76            $res = $client->send($req, $options);
77            $end = \microtime(true);
78        } catch (TransferException $ex) {
79            throw new GuzzleException($ex, $request, new Response($ex->getMessage()));
80        }
81
82        $responseBody = (string) $res->getBody();
83        $response = new Response($responseBody, $res->getStatusCode());
84        $response->setQueryTime($end - $start);
85        if ($connection->hasConfig('bigintConversion')) {
86            $response->setJsonBigintConversion($connection->getConfig('bigintConversion'));
87        }
88
89        $response->setTransferInfo(
90            [
91                'request_header' => $request->getMethod(),
92                'http_code' => $res->getStatusCode(),
93            ]
94        );
95
96        if ($response->hasError()) {
97            throw new ResponseException($request, $response);
98        }
99
100        if ($response->hasFailedShards()) {
101            throw new PartialShardFailureException($request, $response);
102        }
103
104        return $response;
105    }
106
107    /**
108     * @return Psr7\Request
109     */
110    protected function _createPsr7Request(Request $request, Connection $connection)
111    {
112        $req = new Psr7\Request(
113            $request->getMethod(),
114            $this->_getActionPath($request),
115            $connection->hasConfig('headers') && \is_array($connection->getConfig('headers'))
116                ? $connection->getConfig('headers')
117                : []
118        );
119
120        $data = $request->getData();
121        if (!empty($data)) {
122            if (Request::GET === $req->getMethod()) {
123                $req = $req->withMethod(Request::POST);
124            }
125
126            if ($this->hasParam('postWithRequestBody') && true == $this->getParam('postWithRequestBody')) {
127                $request->setMethod(Request::POST);
128                $req = $req->withMethod(Request::POST);
129            }
130
131            $req = $req->withBody($this->streamFor($data));
132        }
133
134        return $req;
135    }
136
137    /**
138     * Return Guzzle resource.
139     *
140     * @param bool $persistent False if not persistent connection
141     */
142    protected function _getGuzzleClient(bool $persistent = true): Client
143    {
144        if (!$persistent || !self::$_guzzleClientConnection) {
145            self::$_guzzleClientConnection = new Client();
146        }
147
148        return self::$_guzzleClientConnection;
149    }
150
151    /**
152     * Builds the base url for the guzzle connection.
153     */
154    protected function _getBaseUrl(Connection $connection): string
155    {
156        // If url is set, url is taken. Otherwise port, host and path
157        $url = $connection->hasConfig('url') ? $connection->getConfig('url') : '';
158
159        if (!empty($url)) {
160            $baseUri = $url;
161        } else {
162            $baseUri = (string) Uri::fromParts([
163                'scheme' => $this->_scheme,
164                'host' => $connection->getHost(),
165                'port' => $connection->getPort(),
166                'path' => \ltrim($connection->getPath(), '/'),
167            ]);
168        }
169
170        return \rtrim($baseUri, '/');
171    }
172
173    /**
174     * Builds the action path url for each request.
175     */
176    protected function _getActionPath(Request $request): string
177    {
178        $action = $request->getPath();
179        if ($action) {
180            $action = '/'.\ltrim($action, '/');
181        }
182
183        if (!Util::isDateMathEscaped($action)) {
184            $action = Util::escapeDateMath($action);
185        }
186
187        $query = $request->getQuery();
188
189        if (!empty($query)) {
190            $action .= '?'.\http_build_query(
191                $this->sanityzeQueryStringBool($query)
192            );
193        }
194
195        return $action;
196    }
197
198    /**
199     * @param mixed $data
200     */
201    private function streamFor($data): StreamInterface
202    {
203        if (\is_array($data)) {
204            $data = JSON::stringify($data, \JSON_UNESCAPED_UNICODE);
205        }
206
207        return \class_exists(Psr7\Utils::class)
208            ? Psr7\Utils::streamFor($data)
209            : Psr7\stream_for($data)
210        ;
211    }
212}
213