1<?php
2namespace GuzzleHttp\Ring\Client;
3
4use GuzzleHttp\Ring\Future\FutureArray;
5use React\Promise\Deferred;
6
7/**
8 * Returns an asynchronous response using curl_multi_* functions.
9 *
10 * This handler supports future responses and the "delay" request client
11 * option that can be used to delay before sending a request.
12 *
13 * When using the CurlMultiHandler, custom curl options can be specified as an
14 * associative array of curl option constants mapping to values in the
15 * **curl** key of the "client" key of the request.
16 *
17 * @property resource $_mh Internal use only. Lazy loaded multi-handle.
18 */
19#[\AllowDynamicProperties]
20class CurlMultiHandler
21{
22    /** @var callable */
23    private $factory;
24    private $selectTimeout;
25    private $active = 0;
26    private $handles = [];
27    private $delays = [];
28    private $maxHandles;
29
30    /**
31     * This handler accepts the following options:
32     *
33     * - mh: An optional curl_multi resource
34     * - handle_factory: An optional callable used to generate curl handle
35     *   resources. the callable accepts a request hash and returns an array
36     *   of the handle, headers file resource, and the body resource.
37     * - select_timeout: Optional timeout (in seconds) to block before timing
38     *   out while selecting curl handles. Defaults to 1 second.
39     * - max_handles: Optional integer representing the maximum number of
40     *   open requests. When this number is reached, the queued futures are
41     *   flushed.
42     *
43     * @param array $options
44     */
45    public function __construct(array $options = [])
46    {
47        if (isset($options['mh'])) {
48            $this->_mh = $options['mh'];
49        }
50        $this->factory = isset($options['handle_factory'])
51            ? $options['handle_factory'] : new CurlFactory();
52        $this->selectTimeout = isset($options['select_timeout'])
53            ? $options['select_timeout'] : 1;
54        $this->maxHandles = isset($options['max_handles'])
55            ? $options['max_handles'] : 100;
56    }
57
58    public function __get($name)
59    {
60        if ($name === '_mh') {
61            return $this->_mh = curl_multi_init();
62        }
63
64        throw new \BadMethodCallException();
65    }
66
67    public function __destruct()
68    {
69        // Finish any open connections before terminating the script.
70        if ($this->handles) {
71            $this->execute();
72        }
73
74        if (isset($this->_mh)) {
75            curl_multi_close($this->_mh);
76            unset($this->_mh);
77        }
78    }
79
80    public function __invoke(array $request)
81    {
82        $factory = $this->factory;
83        $result = $factory($request);
84        $entry = [
85            'request'  => $request,
86            'response' => [],
87            'handle'   => $result[0],
88            'headers'  => &$result[1],
89            'body'     => $result[2],
90            'deferred' => new Deferred(),
91        ];
92
93        $id = (int) $result[0];
94
95        $future = new FutureArray(
96            $entry['deferred']->promise(),
97            [$this, 'execute'],
98            function () use ($id) {
99                return $this->cancel($id);
100            }
101        );
102
103        $this->addRequest($entry);
104
105        // Transfer outstanding requests if there are too many open handles.
106        if (count($this->handles) >= $this->maxHandles) {
107            $this->execute();
108        }
109
110        return $future;
111    }
112
113    /**
114     * Runs until all outstanding connections have completed.
115     */
116    public function execute()
117    {
118        do {
119
120            if ($this->active &&
121                curl_multi_select($this->_mh, $this->selectTimeout) === -1
122            ) {
123                // Perform a usleep if a select returns -1.
124                // See: https://bugs.php.net/bug.php?id=61141
125                usleep(250);
126            }
127
128            // Add any delayed futures if needed.
129            if ($this->delays) {
130                $this->addDelays();
131            }
132
133            do {
134                $mrc = curl_multi_exec($this->_mh, $this->active);
135            } while ($mrc === CURLM_CALL_MULTI_PERFORM);
136
137            $this->processMessages();
138
139            // If there are delays but no transfers, then sleep for a bit.
140            if (!$this->active && $this->delays) {
141                usleep(500);
142            }
143
144        } while ($this->active || $this->handles);
145    }
146
147    private function addRequest(array &$entry)
148    {
149        $id = (int) $entry['handle'];
150        $this->handles[$id] = $entry;
151
152        // If the request is a delay, then add the reques to the curl multi
153        // pool only after the specified delay.
154        if (isset($entry['request']['client']['delay'])) {
155            $this->delays[$id] = microtime(true) + ($entry['request']['client']['delay'] / 1000);
156        } elseif (empty($entry['request']['future'])) {
157            curl_multi_add_handle($this->_mh, $entry['handle']);
158        } else {
159            curl_multi_add_handle($this->_mh, $entry['handle']);
160            // "lazy" futures are only sent once the pool has many requests.
161            if ($entry['request']['future'] !== 'lazy') {
162                do {
163                    $mrc = curl_multi_exec($this->_mh, $this->active);
164                } while ($mrc === CURLM_CALL_MULTI_PERFORM);
165                $this->processMessages();
166            }
167        }
168    }
169
170    private function removeProcessed($id)
171    {
172        if (isset($this->handles[$id])) {
173            curl_multi_remove_handle(
174                $this->_mh,
175                $this->handles[$id]['handle']
176            );
177            curl_close($this->handles[$id]['handle']);
178            unset($this->handles[$id], $this->delays[$id]);
179        }
180    }
181
182    /**
183     * Cancels a handle from sending and removes references to it.
184     *
185     * @param int $id Handle ID to cancel and remove.
186     *
187     * @return bool True on success, false on failure.
188     */
189    private function cancel($id)
190    {
191        // Cannot cancel if it has been processed.
192        if (!isset($this->handles[$id])) {
193            return false;
194        }
195
196        $handle = $this->handles[$id]['handle'];
197        unset($this->delays[$id], $this->handles[$id]);
198        curl_multi_remove_handle($this->_mh, $handle);
199        curl_close($handle);
200
201        return true;
202    }
203
204    private function addDelays()
205    {
206        $currentTime = microtime(true);
207
208        foreach ($this->delays as $id => $delay) {
209            if ($currentTime >= $delay) {
210                unset($this->delays[$id]);
211                curl_multi_add_handle(
212                    $this->_mh,
213                    $this->handles[$id]['handle']
214                );
215            }
216        }
217    }
218
219    private function processMessages()
220    {
221        while ($done = curl_multi_info_read($this->_mh)) {
222            $id = (int) $done['handle'];
223
224            if (!isset($this->handles[$id])) {
225                // Probably was cancelled.
226                continue;
227            }
228
229            $entry = $this->handles[$id];
230            $entry['response']['transfer_stats'] = curl_getinfo($done['handle']);
231
232            if ($done['result'] !== CURLM_OK) {
233                $entry['response']['curl']['errno'] = $done['result'];
234                $entry['response']['curl']['error'] = curl_error($done['handle']);
235            }
236
237            $result = CurlFactory::createResponse(
238                $this,
239                $entry['request'],
240                $entry['response'],
241                $entry['headers'],
242                $entry['body']
243            );
244
245            $this->removeProcessed($id);
246            $entry['deferred']->resolve($result);
247        }
248    }
249}
250