1<?php 2namespace GuzzleHttp\Ring\Client; 3 4use GuzzleHttp\Ring\Future\FutureArray; 5use React\Promise\Deferred; 6 7/** 8 * Returns an asynchronous response using curl_multi_* functions. 9 * 10 * This handler supports future responses and the "delay" request client 11 * option that can be used to delay before sending a request. 12 * 13 * When using the CurlMultiHandler, custom curl options can be specified as an 14 * associative array of curl option constants mapping to values in the 15 * **curl** key of the "client" key of the request. 16 * 17 * @property resource $_mh Internal use only. Lazy loaded multi-handle. 18 */ 19#[\AllowDynamicProperties] 20class CurlMultiHandler 21{ 22 /** @var callable */ 23 private $factory; 24 private $selectTimeout; 25 private $active = 0; 26 private $handles = []; 27 private $delays = []; 28 private $maxHandles; 29 30 /** 31 * This handler accepts the following options: 32 * 33 * - mh: An optional curl_multi resource 34 * - handle_factory: An optional callable used to generate curl handle 35 * resources. the callable accepts a request hash and returns an array 36 * of the handle, headers file resource, and the body resource. 37 * - select_timeout: Optional timeout (in seconds) to block before timing 38 * out while selecting curl handles. Defaults to 1 second. 39 * - max_handles: Optional integer representing the maximum number of 40 * open requests. When this number is reached, the queued futures are 41 * flushed. 42 * 43 * @param array $options 44 */ 45 public function __construct(array $options = []) 46 { 47 if (isset($options['mh'])) { 48 $this->_mh = $options['mh']; 49 } 50 $this->factory = isset($options['handle_factory']) 51 ? $options['handle_factory'] : new CurlFactory(); 52 $this->selectTimeout = isset($options['select_timeout']) 53 ? $options['select_timeout'] : 1; 54 $this->maxHandles = isset($options['max_handles']) 55 ? $options['max_handles'] : 100; 56 } 57 58 public function __get($name) 59 { 60 if ($name === '_mh') { 61 return $this->_mh = curl_multi_init(); 62 } 63 64 throw new \BadMethodCallException(); 65 } 66 67 public function __destruct() 68 { 69 // Finish any open connections before terminating the script. 70 if ($this->handles) { 71 $this->execute(); 72 } 73 74 if (isset($this->_mh)) { 75 curl_multi_close($this->_mh); 76 unset($this->_mh); 77 } 78 } 79 80 public function __invoke(array $request) 81 { 82 $factory = $this->factory; 83 $result = $factory($request); 84 $entry = [ 85 'request' => $request, 86 'response' => [], 87 'handle' => $result[0], 88 'headers' => &$result[1], 89 'body' => $result[2], 90 'deferred' => new Deferred(), 91 ]; 92 93 $id = (int) $result[0]; 94 95 $future = new FutureArray( 96 $entry['deferred']->promise(), 97 [$this, 'execute'], 98 function () use ($id) { 99 return $this->cancel($id); 100 } 101 ); 102 103 $this->addRequest($entry); 104 105 // Transfer outstanding requests if there are too many open handles. 106 if (count($this->handles) >= $this->maxHandles) { 107 $this->execute(); 108 } 109 110 return $future; 111 } 112 113 /** 114 * Runs until all outstanding connections have completed. 115 */ 116 public function execute() 117 { 118 do { 119 120 if ($this->active && 121 curl_multi_select($this->_mh, $this->selectTimeout) === -1 122 ) { 123 // Perform a usleep if a select returns -1. 124 // See: https://bugs.php.net/bug.php?id=61141 125 usleep(250); 126 } 127 128 // Add any delayed futures if needed. 129 if ($this->delays) { 130 $this->addDelays(); 131 } 132 133 do { 134 $mrc = curl_multi_exec($this->_mh, $this->active); 135 } while ($mrc === CURLM_CALL_MULTI_PERFORM); 136 137 $this->processMessages(); 138 139 // If there are delays but no transfers, then sleep for a bit. 140 if (!$this->active && $this->delays) { 141 usleep(500); 142 } 143 144 } while ($this->active || $this->handles); 145 } 146 147 private function addRequest(array &$entry) 148 { 149 $id = (int) $entry['handle']; 150 $this->handles[$id] = $entry; 151 152 // If the request is a delay, then add the reques to the curl multi 153 // pool only after the specified delay. 154 if (isset($entry['request']['client']['delay'])) { 155 $this->delays[$id] = microtime(true) + ($entry['request']['client']['delay'] / 1000); 156 } elseif (empty($entry['request']['future'])) { 157 curl_multi_add_handle($this->_mh, $entry['handle']); 158 } else { 159 curl_multi_add_handle($this->_mh, $entry['handle']); 160 // "lazy" futures are only sent once the pool has many requests. 161 if ($entry['request']['future'] !== 'lazy') { 162 do { 163 $mrc = curl_multi_exec($this->_mh, $this->active); 164 } while ($mrc === CURLM_CALL_MULTI_PERFORM); 165 $this->processMessages(); 166 } 167 } 168 } 169 170 private function removeProcessed($id) 171 { 172 if (isset($this->handles[$id])) { 173 curl_multi_remove_handle( 174 $this->_mh, 175 $this->handles[$id]['handle'] 176 ); 177 curl_close($this->handles[$id]['handle']); 178 unset($this->handles[$id], $this->delays[$id]); 179 } 180 } 181 182 /** 183 * Cancels a handle from sending and removes references to it. 184 * 185 * @param int $id Handle ID to cancel and remove. 186 * 187 * @return bool True on success, false on failure. 188 */ 189 private function cancel($id) 190 { 191 // Cannot cancel if it has been processed. 192 if (!isset($this->handles[$id])) { 193 return false; 194 } 195 196 $handle = $this->handles[$id]['handle']; 197 unset($this->delays[$id], $this->handles[$id]); 198 curl_multi_remove_handle($this->_mh, $handle); 199 curl_close($handle); 200 201 return true; 202 } 203 204 private function addDelays() 205 { 206 $currentTime = microtime(true); 207 208 foreach ($this->delays as $id => $delay) { 209 if ($currentTime >= $delay) { 210 unset($this->delays[$id]); 211 curl_multi_add_handle( 212 $this->_mh, 213 $this->handles[$id]['handle'] 214 ); 215 } 216 } 217 } 218 219 private function processMessages() 220 { 221 while ($done = curl_multi_info_read($this->_mh)) { 222 $id = (int) $done['handle']; 223 224 if (!isset($this->handles[$id])) { 225 // Probably was cancelled. 226 continue; 227 } 228 229 $entry = $this->handles[$id]; 230 $entry['response']['transfer_stats'] = curl_getinfo($done['handle']); 231 232 if ($done['result'] !== CURLM_OK) { 233 $entry['response']['curl']['errno'] = $done['result']; 234 $entry['response']['curl']['error'] = curl_error($done['handle']); 235 } 236 237 $result = CurlFactory::createResponse( 238 $this, 239 $entry['request'], 240 $entry['response'], 241 $entry['headers'], 242 $entry['body'] 243 ); 244 245 $this->removeProcessed($id); 246 $entry['deferred']->resolve($result); 247 } 248 } 249} 250