1<?php 2namespace GuzzleHttp\Stream; 3 4/** 5 * Represents an asynchronous read-only stream that supports a drain event and 6 * pumping data from a source stream. 7 * 8 * The AsyncReadStream can be used as a completely asynchronous stream, meaning 9 * the data you can read from the stream will immediately return only 10 * the data that is currently buffered. 11 * 12 * AsyncReadStream can also be used in a "blocking" manner if a "pump" function 13 * is provided. When a caller requests more bytes than are available in the 14 * buffer, then the pump function is used to block until the requested number 15 * of bytes are available or the remote source stream has errored, closed, or 16 * timed-out. This behavior isn't strictly "blocking" because the pump function 17 * can send other transfers while waiting on the desired buffer size to be 18 * ready for reading (e.g., continue to tick an event loop). 19 * 20 * @unstable This class is subject to change. 21 */ 22class AsyncReadStream implements StreamInterface 23{ 24 use StreamDecoratorTrait; 25 26 /** @var callable|null Fn used to notify writers the buffer has drained */ 27 private $drain; 28 29 /** @var callable|null Fn used to block for more data */ 30 private $pump; 31 32 /** @var int|null Highwater mark of the underlying buffer */ 33 private $hwm; 34 35 /** @var bool Whether or not drain needs to be called at some point */ 36 private $needsDrain; 37 38 /** @var int The expected size of the remote source */ 39 private $size; 40 41 /** 42 * In order to utilize high water marks to tell writers to slow down, the 43 * provided stream must answer to the "hwm" stream metadata variable, 44 * providing the high water mark. If no "hwm" metadata value is available, 45 * then the "drain" functionality is not utilized. 46 * 47 * This class accepts an associative array of configuration options. 48 * 49 * - drain: (callable) Function to invoke when the stream has drained, 50 * meaning the buffer is now writable again because the size of the 51 * buffer is at an acceptable level (e.g., below the high water mark). 52 * The function accepts a single argument, the buffer stream object that 53 * has drained. 54 * - pump: (callable) A function that accepts the number of bytes to read 55 * from the source stream. This function will block until all of the data 56 * that was requested has been read, EOF of the source stream, or the 57 * source stream is closed. 58 * - size: (int) The expected size in bytes of the data that will be read 59 * (if known up-front). 60 * 61 * @param StreamInterface $buffer Buffer that contains the data that has 62 * been read by the event loop. 63 * @param array $config Associative array of options. 64 * 65 * @throws \InvalidArgumentException if the buffer is not readable and 66 * writable. 67 */ 68 public function __construct( 69 StreamInterface $buffer, 70 array $config = [] 71 ) { 72 if (!$buffer->isReadable() || !$buffer->isWritable()) { 73 throw new \InvalidArgumentException( 74 'Buffer must be readable and writable' 75 ); 76 } 77 78 if (isset($config['size'])) { 79 $this->size = $config['size']; 80 } 81 82 static $callables = ['pump', 'drain']; 83 foreach ($callables as $check) { 84 if (isset($config[$check])) { 85 if (!is_callable($config[$check])) { 86 throw new \InvalidArgumentException( 87 $check . ' must be callable' 88 ); 89 } 90 $this->{$check} = $config[$check]; 91 } 92 } 93 94 $this->hwm = $buffer->getMetadata('hwm'); 95 96 // Cannot drain when there's no high water mark. 97 if ($this->hwm === null) { 98 $this->drain = null; 99 } 100 101 $this->stream = $buffer; 102 } 103 104 /** 105 * Factory method used to create new async stream and an underlying buffer 106 * if no buffer is provided. 107 * 108 * This function accepts the same options as AsyncReadStream::__construct, 109 * but added the following key value pairs: 110 * 111 * - buffer: (StreamInterface) Buffer used to buffer data. If none is 112 * provided, a default buffer is created. 113 * - hwm: (int) High water mark to use if a buffer is created on your 114 * behalf. 115 * - max_buffer: (int) If provided, wraps the utilized buffer in a 116 * DroppingStream decorator to ensure that buffer does not exceed a given 117 * length. When exceeded, the stream will begin dropping data. Set the 118 * max_buffer to 0, to use a NullStream which does not store data. 119 * - write: (callable) A function that is invoked when data is written 120 * to the underlying buffer. The function accepts the buffer as the first 121 * argument, and the data being written as the second. The function MUST 122 * return the number of bytes that were written or false to let writers 123 * know to slow down. 124 * - drain: (callable) See constructor documentation. 125 * - pump: (callable) See constructor documentation. 126 * 127 * @param array $options Associative array of options. 128 * 129 * @return array Returns an array containing the buffer used to buffer 130 * data, followed by the ready to use AsyncReadStream object. 131 */ 132 public static function create(array $options = []) 133 { 134 $maxBuffer = isset($options['max_buffer']) 135 ? $options['max_buffer'] 136 : null; 137 138 if ($maxBuffer === 0) { 139 $buffer = new NullStream(); 140 } elseif (isset($options['buffer'])) { 141 $buffer = $options['buffer']; 142 } else { 143 $hwm = isset($options['hwm']) ? $options['hwm'] : 16384; 144 $buffer = new BufferStream($hwm); 145 } 146 147 if ($maxBuffer > 0) { 148 $buffer = new DroppingStream($buffer, $options['max_buffer']); 149 } 150 151 // Call the on_write callback if an on_write function was provided. 152 if (isset($options['write'])) { 153 $onWrite = $options['write']; 154 $buffer = FnStream::decorate($buffer, [ 155 'write' => function ($string) use ($buffer, $onWrite) { 156 $result = $buffer->write($string); 157 $onWrite($buffer, $string); 158 return $result; 159 } 160 ]); 161 } 162 163 return [$buffer, new self($buffer, $options)]; 164 } 165 166 public function getSize() 167 { 168 return $this->size; 169 } 170 171 public function isWritable() 172 { 173 return false; 174 } 175 176 public function write($string) 177 { 178 return false; 179 } 180 181 public function read($length) 182 { 183 if (!$this->needsDrain && $this->drain) { 184 $this->needsDrain = $this->stream->getSize() >= $this->hwm; 185 } 186 187 $result = $this->stream->read($length); 188 189 // If we need to drain, then drain when the buffer is empty. 190 if ($this->needsDrain && $this->stream->getSize() === 0) { 191 $this->needsDrain = false; 192 $drainFn = $this->drain; 193 $drainFn($this->stream); 194 } 195 196 $resultLen = strlen($result); 197 198 // If a pump was provided, the buffer is still open, and not enough 199 // data was given, then block until the data is provided. 200 if ($this->pump && $resultLen < $length) { 201 $pumpFn = $this->pump; 202 $result .= $pumpFn($length - $resultLen); 203 } 204 205 return $result; 206 } 207} 208