isReadable() || !$buffer->isWritable()) { throw new \InvalidArgumentException( 'Buffer must be readable and writable' ); } if (isset($config['size'])) { $this->size = $config['size']; } static $callables = ['pump', 'drain']; foreach ($callables as $check) { if (isset($config[$check])) { if (!is_callable($config[$check])) { throw new \InvalidArgumentException( $check . ' must be callable' ); } $this->{$check} = $config[$check]; } } $this->hwm = $buffer->getMetadata('hwm'); // Cannot drain when there's no high water mark. if ($this->hwm === null) { $this->drain = null; } $this->stream = $buffer; } /** * Factory method used to create new async stream and an underlying buffer * if no buffer is provided. * * This function accepts the same options as AsyncReadStream::__construct, * but added the following key value pairs: * * - buffer: (StreamInterface) Buffer used to buffer data. If none is * provided, a default buffer is created. * - hwm: (int) High water mark to use if a buffer is created on your * behalf. * - max_buffer: (int) If provided, wraps the utilized buffer in a * DroppingStream decorator to ensure that buffer does not exceed a given * length. When exceeded, the stream will begin dropping data. Set the * max_buffer to 0, to use a NullStream which does not store data. * - write: (callable) A function that is invoked when data is written * to the underlying buffer. The function accepts the buffer as the first * argument, and the data being written as the second. The function MUST * return the number of bytes that were written or false to let writers * know to slow down. * - drain: (callable) See constructor documentation. * - pump: (callable) See constructor documentation. * * @param array $options Associative array of options. * * @return array Returns an array containing the buffer used to buffer * data, followed by the ready to use AsyncReadStream object. */ public static function create(array $options = []) { $maxBuffer = isset($options['max_buffer']) ? $options['max_buffer'] : null; if ($maxBuffer === 0) { $buffer = new NullStream(); } elseif (isset($options['buffer'])) { $buffer = $options['buffer']; } else { $hwm = isset($options['hwm']) ? $options['hwm'] : 16384; $buffer = new BufferStream($hwm); } if ($maxBuffer > 0) { $buffer = new DroppingStream($buffer, $options['max_buffer']); } // Call the on_write callback if an on_write function was provided. if (isset($options['write'])) { $onWrite = $options['write']; $buffer = FnStream::decorate($buffer, [ 'write' => function ($string) use ($buffer, $onWrite) { $result = $buffer->write($string); $onWrite($buffer, $string); return $result; } ]); } return [$buffer, new self($buffer, $options)]; } public function getSize() { return $this->size; } public function isWritable() { return false; } public function write($string) { return false; } public function read($length) { if (!$this->needsDrain && $this->drain) { $this->needsDrain = $this->stream->getSize() >= $this->hwm; } $result = $this->stream->read($length); // If we need to drain, then drain when the buffer is empty. if ($this->needsDrain && $this->stream->getSize() === 0) { $this->needsDrain = false; $drainFn = $this->drain; $drainFn($this->stream); } $resultLen = strlen($result); // If a pump was provided, the buffer is still open, and not enough // data was given, then block until the data is provided. if ($this->pump && $resultLen < $length) { $pumpFn = $this->pump; $result .= $pumpFn($length - $resultLen); } return $result; } }