1<?php 2namespace GuzzleHttp; 3 4use GuzzleHttp\Promise\EachPromise; 5use GuzzleHttp\Promise\PromiseInterface; 6use GuzzleHttp\Promise\PromisorInterface; 7use Psr\Http\Message\RequestInterface; 8 9/** 10 * Sends an iterator of requests concurrently using a capped pool size. 11 * 12 * The pool will read from an iterator until it is cancelled or until the 13 * iterator is consumed. When a request is yielded, the request is sent after 14 * applying the "request_options" request options (if provided in the ctor). 15 * 16 * When a function is yielded by the iterator, the function is provided the 17 * "request_options" array that should be merged on top of any existing 18 * options, and the function MUST then return a wait-able promise. 19 */ 20class Pool implements PromisorInterface 21{ 22 /** @var EachPromise */ 23 private $each; 24 25 /** 26 * @param ClientInterface $client Client used to send the requests. 27 * @param array|\Iterator $requests Requests or functions that return 28 * requests to send concurrently. 29 * @param array $config Associative array of options 30 * - concurrency: (int) Maximum number of requests to send concurrently 31 * - options: Array of request options to apply to each request. 32 * - fulfilled: (callable) Function to invoke when a request completes. 33 * - rejected: (callable) Function to invoke when a request is rejected. 34 */ 35 public function __construct( 36 ClientInterface $client, 37 $requests, 38 array $config = [] 39 ) { 40 // Backwards compatibility. 41 if (isset($config['pool_size'])) { 42 $config['concurrency'] = $config['pool_size']; 43 } elseif (!isset($config['concurrency'])) { 44 $config['concurrency'] = 25; 45 } 46 47 if (isset($config['options'])) { 48 $opts = $config['options']; 49 unset($config['options']); 50 } else { 51 $opts = []; 52 } 53 54 $iterable = \GuzzleHttp\Promise\iter_for($requests); 55 $requests = function () use ($iterable, $client, $opts) { 56 foreach ($iterable as $key => $rfn) { 57 if ($rfn instanceof RequestInterface) { 58 yield $key => $client->sendAsync($rfn, $opts); 59 } elseif (is_callable($rfn)) { 60 yield $key => $rfn($opts); 61 } else { 62 throw new \InvalidArgumentException('Each value yielded by ' 63 . 'the iterator must be a Psr7\Http\Message\RequestInterface ' 64 . 'or a callable that returns a promise that fulfills ' 65 . 'with a Psr7\Message\Http\ResponseInterface object.'); 66 } 67 } 68 }; 69 70 $this->each = new EachPromise($requests(), $config); 71 } 72 73 /** 74 * Get promise 75 * 76 * @return PromiseInterface 77 */ 78 public function promise() 79 { 80 return $this->each->promise(); 81 } 82 83 /** 84 * Sends multiple requests concurrently and returns an array of responses 85 * and exceptions that uses the same ordering as the provided requests. 86 * 87 * IMPORTANT: This method keeps every request and response in memory, and 88 * as such, is NOT recommended when sending a large number or an 89 * indeterminate number of requests concurrently. 90 * 91 * @param ClientInterface $client Client used to send the requests 92 * @param array|\Iterator $requests Requests to send concurrently. 93 * @param array $options Passes through the options available in 94 * {@see GuzzleHttp\Pool::__construct} 95 * 96 * @return array Returns an array containing the response or an exception 97 * in the same order that the requests were sent. 98 * @throws \InvalidArgumentException if the event format is incorrect. 99 */ 100 public static function batch( 101 ClientInterface $client, 102 $requests, 103 array $options = [] 104 ) { 105 $res = []; 106 self::cmpCallback($options, 'fulfilled', $res); 107 self::cmpCallback($options, 'rejected', $res); 108 $pool = new static($client, $requests, $options); 109 $pool->promise()->wait(); 110 ksort($res); 111 112 return $res; 113 } 114 115 /** 116 * Execute callback(s) 117 * 118 * @return void 119 */ 120 private static function cmpCallback(array &$options, $name, array &$results) 121 { 122 if (!isset($options[$name])) { 123 $options[$name] = function ($v, $k) use (&$results) { 124 $results[$k] = $v; 125 }; 126 } else { 127 $currentFn = $options[$name]; 128 $options[$name] = function ($v, $k) use (&$results, $currentFn) { 129 $currentFn($v, $k); 130 $results[$k] = $v; 131 }; 132 } 133 } 134} 135