1<?php 2 3declare(strict_types=1); 4 5namespace GuzzleHttp\Promise; 6 7/** 8 * Represents a promise that iterates over many promises and invokes 9 * side-effect functions in the process. 10 * 11 * @final 12 */ 13class EachPromise implements PromisorInterface 14{ 15 private $pending = []; 16 17 private $nextPendingIndex = 0; 18 19 /** @var \Iterator|null */ 20 private $iterable; 21 22 /** @var callable|int|null */ 23 private $concurrency; 24 25 /** @var callable|null */ 26 private $onFulfilled; 27 28 /** @var callable|null */ 29 private $onRejected; 30 31 /** @var Promise|null */ 32 private $aggregate; 33 34 /** @var bool|null */ 35 private $mutex; 36 37 /** 38 * Configuration hash can include the following key value pairs: 39 * 40 * - fulfilled: (callable) Invoked when a promise fulfills. The function 41 * is invoked with three arguments: the fulfillment value, the index 42 * position from the iterable list of the promise, and the aggregate 43 * promise that manages all of the promises. The aggregate promise may 44 * be resolved from within the callback to short-circuit the promise. 45 * - rejected: (callable) Invoked when a promise is rejected. The 46 * function is invoked with three arguments: the rejection reason, the 47 * index position from the iterable list of the promise, and the 48 * aggregate promise that manages all of the promises. The aggregate 49 * promise may be resolved from within the callback to short-circuit 50 * the promise. 51 * - concurrency: (integer) Pass this configuration option to limit the 52 * allowed number of outstanding concurrently executing promises, 53 * creating a capped pool of promises. There is no limit by default. 54 * 55 * @param mixed $iterable Promises or values to iterate. 56 * @param array $config Configuration options 57 */ 58 public function __construct($iterable, array $config = []) 59 { 60 $this->iterable = Create::iterFor($iterable); 61 62 if (isset($config['concurrency'])) { 63 $this->concurrency = $config['concurrency']; 64 } 65 66 if (isset($config['fulfilled'])) { 67 $this->onFulfilled = $config['fulfilled']; 68 } 69 70 if (isset($config['rejected'])) { 71 $this->onRejected = $config['rejected']; 72 } 73 } 74 75 /** @psalm-suppress InvalidNullableReturnType */ 76 public function promise(): PromiseInterface 77 { 78 if ($this->aggregate) { 79 return $this->aggregate; 80 } 81 82 try { 83 $this->createPromise(); 84 /** @psalm-assert Promise $this->aggregate */ 85 $this->iterable->rewind(); 86 $this->refillPending(); 87 } catch (\Throwable $e) { 88 $this->aggregate->reject($e); 89 } 90 91 /** 92 * @psalm-suppress NullableReturnStatement 93 */ 94 return $this->aggregate; 95 } 96 97 private function createPromise(): void 98 { 99 $this->mutex = false; 100 $this->aggregate = new Promise(function (): void { 101 if ($this->checkIfFinished()) { 102 return; 103 } 104 reset($this->pending); 105 // Consume a potentially fluctuating list of promises while 106 // ensuring that indexes are maintained (precluding array_shift). 107 while ($promise = current($this->pending)) { 108 next($this->pending); 109 $promise->wait(); 110 if (Is::settled($this->aggregate)) { 111 return; 112 } 113 } 114 }); 115 116 // Clear the references when the promise is resolved. 117 $clearFn = function (): void { 118 $this->iterable = $this->concurrency = $this->pending = null; 119 $this->onFulfilled = $this->onRejected = null; 120 $this->nextPendingIndex = 0; 121 }; 122 123 $this->aggregate->then($clearFn, $clearFn); 124 } 125 126 private function refillPending(): void 127 { 128 if (!$this->concurrency) { 129 // Add all pending promises. 130 while ($this->addPending() && $this->advanceIterator()) { 131 } 132 133 return; 134 } 135 136 // Add only up to N pending promises. 137 $concurrency = is_callable($this->concurrency) 138 ? ($this->concurrency)(count($this->pending)) 139 : $this->concurrency; 140 $concurrency = max($concurrency - count($this->pending), 0); 141 // Concurrency may be set to 0 to disallow new promises. 142 if (!$concurrency) { 143 return; 144 } 145 // Add the first pending promise. 146 $this->addPending(); 147 // Note this is special handling for concurrency=1 so that we do 148 // not advance the iterator after adding the first promise. This 149 // helps work around issues with generators that might not have the 150 // next value to yield until promise callbacks are called. 151 while (--$concurrency 152 && $this->advanceIterator() 153 && $this->addPending()) { 154 } 155 } 156 157 private function addPending(): bool 158 { 159 if (!$this->iterable || !$this->iterable->valid()) { 160 return false; 161 } 162 163 $promise = Create::promiseFor($this->iterable->current()); 164 $key = $this->iterable->key(); 165 166 // Iterable keys may not be unique, so we use a counter to 167 // guarantee uniqueness 168 $idx = $this->nextPendingIndex++; 169 170 $this->pending[$idx] = $promise->then( 171 function ($value) use ($idx, $key): void { 172 if ($this->onFulfilled) { 173 ($this->onFulfilled)( 174 $value, 175 $key, 176 $this->aggregate 177 ); 178 } 179 $this->step($idx); 180 }, 181 function ($reason) use ($idx, $key): void { 182 if ($this->onRejected) { 183 ($this->onRejected)( 184 $reason, 185 $key, 186 $this->aggregate 187 ); 188 } 189 $this->step($idx); 190 } 191 ); 192 193 return true; 194 } 195 196 private function advanceIterator(): bool 197 { 198 // Place a lock on the iterator so that we ensure to not recurse, 199 // preventing fatal generator errors. 200 if ($this->mutex) { 201 return false; 202 } 203 204 $this->mutex = true; 205 206 try { 207 $this->iterable->next(); 208 $this->mutex = false; 209 210 return true; 211 } catch (\Throwable $e) { 212 $this->aggregate->reject($e); 213 $this->mutex = false; 214 215 return false; 216 } 217 } 218 219 private function step(int $idx): void 220 { 221 // If the promise was already resolved, then ignore this step. 222 if (Is::settled($this->aggregate)) { 223 return; 224 } 225 226 unset($this->pending[$idx]); 227 228 // Only refill pending promises if we are not locked, preventing the 229 // EachPromise to recursively invoke the provided iterator, which 230 // cause a fatal error: "Cannot resume an already running generator" 231 if ($this->advanceIterator() && !$this->checkIfFinished()) { 232 // Add more pending promises if possible. 233 $this->refillPending(); 234 } 235 } 236 237 private function checkIfFinished(): bool 238 { 239 if (!$this->pending && !$this->iterable->valid()) { 240 // Resolve the promise if there's nothing left to do. 241 $this->aggregate->resolve(null); 242 243 return true; 244 } 245 246 return false; 247 } 248} 249