1<?php
2
3namespace GuzzleHttp\Promise;
4
5/**
6 * Represents a promise that iterates over many promises and invokes
7 * side-effect functions in the process.
8 */
9class EachPromise implements PromisorInterface
10{
11    private $pending = [];
12
13    private $nextPendingIndex = 0;
14
15    /** @var \Iterator|null */
16    private $iterable;
17
18    /** @var callable|int|null */
19    private $concurrency;
20
21    /** @var callable|null */
22    private $onFulfilled;
23
24    /** @var callable|null */
25    private $onRejected;
26
27    /** @var Promise|null */
28    private $aggregate;
29
30    /** @var bool|null */
31    private $mutex;
32
33    /**
34     * Configuration hash can include the following key value pairs:
35     *
36     * - fulfilled: (callable) Invoked when a promise fulfills. The function
37     *   is invoked with three arguments: the fulfillment value, the index
38     *   position from the iterable list of the promise, and the aggregate
39     *   promise that manages all of the promises. The aggregate promise may
40     *   be resolved from within the callback to short-circuit the promise.
41     * - rejected: (callable) Invoked when a promise is rejected. The
42     *   function is invoked with three arguments: the rejection reason, the
43     *   index position from the iterable list of the promise, and the
44     *   aggregate promise that manages all of the promises. The aggregate
45     *   promise may be resolved from within the callback to short-circuit
46     *   the promise.
47     * - concurrency: (integer) Pass this configuration option to limit the
48     *   allowed number of outstanding concurrently executing promises,
49     *   creating a capped pool of promises. There is no limit by default.
50     *
51     * @param mixed $iterable Promises or values to iterate.
52     * @param array $config   Configuration options
53     */
54    public function __construct($iterable, array $config = [])
55    {
56        $this->iterable = Create::iterFor($iterable);
57
58        if (isset($config['concurrency'])) {
59            $this->concurrency = $config['concurrency'];
60        }
61
62        if (isset($config['fulfilled'])) {
63            $this->onFulfilled = $config['fulfilled'];
64        }
65
66        if (isset($config['rejected'])) {
67            $this->onRejected = $config['rejected'];
68        }
69    }
70
71    /** @psalm-suppress InvalidNullableReturnType */
72    public function promise()
73    {
74        if ($this->aggregate) {
75            return $this->aggregate;
76        }
77
78        try {
79            $this->createPromise();
80            /** @psalm-assert Promise $this->aggregate */
81            $this->iterable->rewind();
82            $this->refillPending();
83        } catch (\Throwable $e) {
84            /**
85             * @psalm-suppress NullReference
86             * @phpstan-ignore-next-line
87             */
88            $this->aggregate->reject($e);
89        } catch (\Exception $e) {
90            /**
91             * @psalm-suppress NullReference
92             * @phpstan-ignore-next-line
93             */
94            $this->aggregate->reject($e);
95        }
96
97        /**
98         * @psalm-suppress NullableReturnStatement
99         * @phpstan-ignore-next-line
100         */
101        return $this->aggregate;
102    }
103
104    private function createPromise()
105    {
106        $this->mutex = false;
107        $this->aggregate = new Promise(function () {
108            if ($this->checkIfFinished()) {
109                return;
110            }
111            reset($this->pending);
112            // Consume a potentially fluctuating list of promises while
113            // ensuring that indexes are maintained (precluding array_shift).
114            while ($promise = current($this->pending)) {
115                next($this->pending);
116                $promise->wait();
117                if (Is::settled($this->aggregate)) {
118                    return;
119                }
120            }
121        });
122
123        // Clear the references when the promise is resolved.
124        $clearFn = function () {
125            $this->iterable = $this->concurrency = $this->pending = null;
126            $this->onFulfilled = $this->onRejected = null;
127            $this->nextPendingIndex = 0;
128        };
129
130        $this->aggregate->then($clearFn, $clearFn);
131    }
132
133    private function refillPending()
134    {
135        if (!$this->concurrency) {
136            // Add all pending promises.
137            while ($this->addPending() && $this->advanceIterator());
138            return;
139        }
140
141        // Add only up to N pending promises.
142        $concurrency = is_callable($this->concurrency)
143            ? call_user_func($this->concurrency, count($this->pending))
144            : $this->concurrency;
145        $concurrency = max($concurrency - count($this->pending), 0);
146        // Concurrency may be set to 0 to disallow new promises.
147        if (!$concurrency) {
148            return;
149        }
150        // Add the first pending promise.
151        $this->addPending();
152        // Note this is special handling for concurrency=1 so that we do
153        // not advance the iterator after adding the first promise. This
154        // helps work around issues with generators that might not have the
155        // next value to yield until promise callbacks are called.
156        while (--$concurrency
157            && $this->advanceIterator()
158            && $this->addPending());
159    }
160
161    private function addPending()
162    {
163        if (!$this->iterable || !$this->iterable->valid()) {
164            return false;
165        }
166
167        $promise = Create::promiseFor($this->iterable->current());
168        $key = $this->iterable->key();
169
170        // Iterable keys may not be unique, so we use a counter to
171        // guarantee uniqueness
172        $idx = $this->nextPendingIndex++;
173
174        $this->pending[$idx] = $promise->then(
175            function ($value) use ($idx, $key) {
176                if ($this->onFulfilled) {
177                    call_user_func(
178                        $this->onFulfilled,
179                        $value,
180                        $key,
181                        $this->aggregate
182                    );
183                }
184                $this->step($idx);
185            },
186            function ($reason) use ($idx, $key) {
187                if ($this->onRejected) {
188                    call_user_func(
189                        $this->onRejected,
190                        $reason,
191                        $key,
192                        $this->aggregate
193                    );
194                }
195                $this->step($idx);
196            }
197        );
198
199        return true;
200    }
201
202    private function advanceIterator()
203    {
204        // Place a lock on the iterator so that we ensure to not recurse,
205        // preventing fatal generator errors.
206        if ($this->mutex) {
207            return false;
208        }
209
210        $this->mutex = true;
211
212        try {
213            $this->iterable->next();
214            $this->mutex = false;
215            return true;
216        } catch (\Throwable $e) {
217            $this->aggregate->reject($e);
218            $this->mutex = false;
219            return false;
220        } catch (\Exception $e) {
221            $this->aggregate->reject($e);
222            $this->mutex = false;
223            return false;
224        }
225    }
226
227    private function step($idx)
228    {
229        // If the promise was already resolved, then ignore this step.
230        if (Is::settled($this->aggregate)) {
231            return;
232        }
233
234        unset($this->pending[$idx]);
235
236        // Only refill pending promises if we are not locked, preventing the
237        // EachPromise to recursively invoke the provided iterator, which
238        // cause a fatal error: "Cannot resume an already running generator"
239        if ($this->advanceIterator() && !$this->checkIfFinished()) {
240            // Add more pending promises if possible.
241            $this->refillPending();
242        }
243    }
244
245    private function checkIfFinished()
246    {
247        if (!$this->pending && !$this->iterable->valid()) {
248            // Resolve the promise if there's nothing left to do.
249            $this->aggregate->resolve(null);
250            return true;
251        }
252
253        return false;
254    }
255}
256