1<?php 2 3namespace GuzzleHttp\Promise; 4 5/** 6 * Represents a promise that iterates over many promises and invokes 7 * side-effect functions in the process. 8 */ 9class EachPromise implements PromisorInterface 10{ 11 private $pending = []; 12 13 private $nextPendingIndex = 0; 14 15 /** @var \Iterator|null */ 16 private $iterable; 17 18 /** @var callable|int|null */ 19 private $concurrency; 20 21 /** @var callable|null */ 22 private $onFulfilled; 23 24 /** @var callable|null */ 25 private $onRejected; 26 27 /** @var Promise|null */ 28 private $aggregate; 29 30 /** @var bool|null */ 31 private $mutex; 32 33 /** 34 * Configuration hash can include the following key value pairs: 35 * 36 * - fulfilled: (callable) Invoked when a promise fulfills. The function 37 * is invoked with three arguments: the fulfillment value, the index 38 * position from the iterable list of the promise, and the aggregate 39 * promise that manages all of the promises. The aggregate promise may 40 * be resolved from within the callback to short-circuit the promise. 41 * - rejected: (callable) Invoked when a promise is rejected. The 42 * function is invoked with three arguments: the rejection reason, the 43 * index position from the iterable list of the promise, and the 44 * aggregate promise that manages all of the promises. The aggregate 45 * promise may be resolved from within the callback to short-circuit 46 * the promise. 47 * - concurrency: (integer) Pass this configuration option to limit the 48 * allowed number of outstanding concurrently executing promises, 49 * creating a capped pool of promises. There is no limit by default. 50 * 51 * @param mixed $iterable Promises or values to iterate. 52 * @param array $config Configuration options 53 */ 54 public function __construct($iterable, array $config = []) 55 { 56 $this->iterable = Create::iterFor($iterable); 57 58 if (isset($config['concurrency'])) { 59 $this->concurrency = $config['concurrency']; 60 } 61 62 if (isset($config['fulfilled'])) { 63 $this->onFulfilled = $config['fulfilled']; 64 } 65 66 if (isset($config['rejected'])) { 67 $this->onRejected = $config['rejected']; 68 } 69 } 70 71 /** @psalm-suppress InvalidNullableReturnType */ 72 public function promise() 73 { 74 if ($this->aggregate) { 75 return $this->aggregate; 76 } 77 78 try { 79 $this->createPromise(); 80 /** @psalm-assert Promise $this->aggregate */ 81 $this->iterable->rewind(); 82 $this->refillPending(); 83 } catch (\Throwable $e) { 84 /** 85 * @psalm-suppress NullReference 86 * @phpstan-ignore-next-line 87 */ 88 $this->aggregate->reject($e); 89 } catch (\Exception $e) { 90 /** 91 * @psalm-suppress NullReference 92 * @phpstan-ignore-next-line 93 */ 94 $this->aggregate->reject($e); 95 } 96 97 /** 98 * @psalm-suppress NullableReturnStatement 99 * @phpstan-ignore-next-line 100 */ 101 return $this->aggregate; 102 } 103 104 private function createPromise() 105 { 106 $this->mutex = false; 107 $this->aggregate = new Promise(function () { 108 if ($this->checkIfFinished()) { 109 return; 110 } 111 reset($this->pending); 112 // Consume a potentially fluctuating list of promises while 113 // ensuring that indexes are maintained (precluding array_shift). 114 while ($promise = current($this->pending)) { 115 next($this->pending); 116 $promise->wait(); 117 if (Is::settled($this->aggregate)) { 118 return; 119 } 120 } 121 }); 122 123 // Clear the references when the promise is resolved. 124 $clearFn = function () { 125 $this->iterable = $this->concurrency = $this->pending = null; 126 $this->onFulfilled = $this->onRejected = null; 127 $this->nextPendingIndex = 0; 128 }; 129 130 $this->aggregate->then($clearFn, $clearFn); 131 } 132 133 private function refillPending() 134 { 135 if (!$this->concurrency) { 136 // Add all pending promises. 137 while ($this->addPending() && $this->advanceIterator()); 138 return; 139 } 140 141 // Add only up to N pending promises. 142 $concurrency = is_callable($this->concurrency) 143 ? call_user_func($this->concurrency, count($this->pending)) 144 : $this->concurrency; 145 $concurrency = max($concurrency - count($this->pending), 0); 146 // Concurrency may be set to 0 to disallow new promises. 147 if (!$concurrency) { 148 return; 149 } 150 // Add the first pending promise. 151 $this->addPending(); 152 // Note this is special handling for concurrency=1 so that we do 153 // not advance the iterator after adding the first promise. This 154 // helps work around issues with generators that might not have the 155 // next value to yield until promise callbacks are called. 156 while (--$concurrency 157 && $this->advanceIterator() 158 && $this->addPending()); 159 } 160 161 private function addPending() 162 { 163 if (!$this->iterable || !$this->iterable->valid()) { 164 return false; 165 } 166 167 $promise = Create::promiseFor($this->iterable->current()); 168 $key = $this->iterable->key(); 169 170 // Iterable keys may not be unique, so we use a counter to 171 // guarantee uniqueness 172 $idx = $this->nextPendingIndex++; 173 174 $this->pending[$idx] = $promise->then( 175 function ($value) use ($idx, $key) { 176 if ($this->onFulfilled) { 177 call_user_func( 178 $this->onFulfilled, 179 $value, 180 $key, 181 $this->aggregate 182 ); 183 } 184 $this->step($idx); 185 }, 186 function ($reason) use ($idx, $key) { 187 if ($this->onRejected) { 188 call_user_func( 189 $this->onRejected, 190 $reason, 191 $key, 192 $this->aggregate 193 ); 194 } 195 $this->step($idx); 196 } 197 ); 198 199 return true; 200 } 201 202 private function advanceIterator() 203 { 204 // Place a lock on the iterator so that we ensure to not recurse, 205 // preventing fatal generator errors. 206 if ($this->mutex) { 207 return false; 208 } 209 210 $this->mutex = true; 211 212 try { 213 $this->iterable->next(); 214 $this->mutex = false; 215 return true; 216 } catch (\Throwable $e) { 217 $this->aggregate->reject($e); 218 $this->mutex = false; 219 return false; 220 } catch (\Exception $e) { 221 $this->aggregate->reject($e); 222 $this->mutex = false; 223 return false; 224 } 225 } 226 227 private function step($idx) 228 { 229 // If the promise was already resolved, then ignore this step. 230 if (Is::settled($this->aggregate)) { 231 return; 232 } 233 234 unset($this->pending[$idx]); 235 236 // Only refill pending promises if we are not locked, preventing the 237 // EachPromise to recursively invoke the provided iterator, which 238 // cause a fatal error: "Cannot resume an already running generator" 239 if ($this->advanceIterator() && !$this->checkIfFinished()) { 240 // Add more pending promises if possible. 241 $this->refillPending(); 242 } 243 } 244 245 private function checkIfFinished() 246 { 247 if (!$this->pending && !$this->iterable->valid()) { 248 // Resolve the promise if there's nothing left to do. 249 $this->aggregate->resolve(null); 250 return true; 251 } 252 253 return false; 254 } 255} 256