1<?php 2 3namespace Elastica\Transport; 4 5use Elastica\Connection; 6use Elastica\Exception\Connection\GuzzleException; 7use Elastica\Exception\PartialShardFailureException; 8use Elastica\Exception\ResponseException; 9use Elastica\JSON; 10use Elastica\Request; 11use Elastica\Response; 12use Elastica\Util; 13use GuzzleHttp\Client; 14use GuzzleHttp\Exception\TransferException; 15use GuzzleHttp\Psr7; 16use GuzzleHttp\Psr7\Uri; 17use GuzzleHttp\RequestOptions; 18use Psr\Http\Message\StreamInterface; 19 20/** 21 * Elastica Guzzle Transport object. 22 * 23 * @author Milan Magudia <milan@magudia.com> 24 */ 25class Guzzle extends AbstractTransport 26{ 27 /** 28 * Http scheme. 29 * 30 * @var string Http scheme 31 */ 32 protected $_scheme = 'http'; 33 34 /** 35 * Curl resource to reuse. 36 * 37 * @var Client|null Guzzle client to reuse 38 */ 39 protected static $_guzzleClientConnection; 40 41 /** 42 * Makes calls to the elasticsearch server. 43 * 44 * All calls that are made to the server are done through this function 45 * 46 * @throws \Elastica\Exception\ConnectionException 47 * @throws ResponseException 48 * @throws \Elastica\Exception\Connection\HttpException 49 */ 50 public function exec(Request $request, array $params): Response 51 { 52 $connection = $this->getConnection(); 53 54 $client = $this->_getGuzzleClient($connection->isPersistent()); 55 56 $options = [ 57 'base_uri' => $this->_getBaseUrl($connection), 58 RequestOptions::HEADERS => [ 59 'Content-Type' => $request->getContentType(), 60 ], 61 RequestOptions::HTTP_ERRORS => false, // 4xx and 5xx is expected and NOT an exceptions in this context 62 ]; 63 64 if ($connection->getTimeout()) { 65 $options[RequestOptions::TIMEOUT] = $connection->getTimeout(); 66 } 67 68 if (null !== $proxy = $connection->getProxy()) { 69 $options[RequestOptions::PROXY] = $proxy; 70 } 71 72 $req = $this->_createPsr7Request($request, $connection); 73 74 try { 75 $start = \microtime(true); 76 $res = $client->send($req, $options); 77 $end = \microtime(true); 78 } catch (TransferException $ex) { 79 throw new GuzzleException($ex, $request, new Response($ex->getMessage())); 80 } 81 82 $responseBody = (string) $res->getBody(); 83 $response = new Response($responseBody, $res->getStatusCode()); 84 $response->setQueryTime($end - $start); 85 if ($connection->hasConfig('bigintConversion')) { 86 $response->setJsonBigintConversion($connection->getConfig('bigintConversion')); 87 } 88 89 $response->setTransferInfo( 90 [ 91 'request_header' => $request->getMethod(), 92 'http_code' => $res->getStatusCode(), 93 ] 94 ); 95 96 if ($response->hasError()) { 97 throw new ResponseException($request, $response); 98 } 99 100 if ($response->hasFailedShards()) { 101 throw new PartialShardFailureException($request, $response); 102 } 103 104 return $response; 105 } 106 107 /** 108 * @return Psr7\Request 109 */ 110 protected function _createPsr7Request(Request $request, Connection $connection) 111 { 112 $req = new Psr7\Request( 113 $request->getMethod(), 114 $this->_getActionPath($request), 115 $connection->hasConfig('headers') && \is_array($connection->getConfig('headers')) 116 ? $connection->getConfig('headers') 117 : [] 118 ); 119 120 $data = $request->getData(); 121 if (!empty($data)) { 122 if (Request::GET === $req->getMethod()) { 123 $req = $req->withMethod(Request::POST); 124 } 125 126 if ($this->hasParam('postWithRequestBody') && true == $this->getParam('postWithRequestBody')) { 127 $request->setMethod(Request::POST); 128 $req = $req->withMethod(Request::POST); 129 } 130 131 $req = $req->withBody($this->streamFor($data)); 132 } 133 134 return $req; 135 } 136 137 /** 138 * Return Guzzle resource. 139 * 140 * @param bool $persistent False if not persistent connection 141 */ 142 protected function _getGuzzleClient(bool $persistent = true): Client 143 { 144 if (!$persistent || !self::$_guzzleClientConnection) { 145 self::$_guzzleClientConnection = new Client(); 146 } 147 148 return self::$_guzzleClientConnection; 149 } 150 151 /** 152 * Builds the base url for the guzzle connection. 153 */ 154 protected function _getBaseUrl(Connection $connection): string 155 { 156 // If url is set, url is taken. Otherwise port, host and path 157 $url = $connection->hasConfig('url') ? $connection->getConfig('url') : ''; 158 159 if (!empty($url)) { 160 $baseUri = $url; 161 } else { 162 $baseUri = (string) Uri::fromParts([ 163 'scheme' => $this->_scheme, 164 'host' => $connection->getHost(), 165 'port' => $connection->getPort(), 166 'path' => \ltrim($connection->getPath(), '/'), 167 ]); 168 } 169 170 return \rtrim($baseUri, '/'); 171 } 172 173 /** 174 * Builds the action path url for each request. 175 */ 176 protected function _getActionPath(Request $request): string 177 { 178 $action = $request->getPath(); 179 if ($action) { 180 $action = '/'.\ltrim($action, '/'); 181 } 182 183 if (!Util::isDateMathEscaped($action)) { 184 $action = Util::escapeDateMath($action); 185 } 186 187 $query = $request->getQuery(); 188 189 if (!empty($query)) { 190 $action .= '?'.\http_build_query( 191 $this->sanityzeQueryStringBool($query) 192 ); 193 } 194 195 return $action; 196 } 197 198 /** 199 * @param mixed $data 200 */ 201 private function streamFor($data): StreamInterface 202 { 203 if (\is_array($data)) { 204 $data = JSON::stringify($data, \JSON_UNESCAPED_UNICODE); 205 } 206 207 return \class_exists(Psr7\Utils::class) 208 ? Psr7\Utils::streamFor($data) 209 : Psr7\stream_for($data) 210 ; 211 } 212} 213