1<?php
2namespace GuzzleHttp\Stream;
3
4/**
5 * Represents an asynchronous read-only stream that supports a drain event and
6 * pumping data from a source stream.
7 *
8 * The AsyncReadStream can be used as a completely asynchronous stream, meaning
9 * the data you can read from the stream will immediately return only
10 * the data that is currently buffered.
11 *
12 * AsyncReadStream can also be used in a "blocking" manner if a "pump" function
13 * is provided. When a caller requests more bytes than are available in the
14 * buffer, then the pump function is used to block until the requested number
15 * of bytes are available or the remote source stream has errored, closed, or
16 * timed-out. This behavior isn't strictly "blocking" because the pump function
17 * can send other transfers while waiting on the desired buffer size to be
18 * ready for reading (e.g., continue to tick an event loop).
19 *
20 * @unstable This class is subject to change.
21 */
22class AsyncReadStream implements StreamInterface
23{
24    use StreamDecoratorTrait;
25
26    /** @var callable|null Fn used to notify writers the buffer has drained */
27    private $drain;
28
29    /** @var callable|null Fn used to block for more data */
30    private $pump;
31
32    /** @var int|null Highwater mark of the underlying buffer */
33    private $hwm;
34
35    /** @var bool Whether or not drain needs to be called at some point */
36    private $needsDrain;
37
38    /** @var int The expected size of the remote source */
39    private $size;
40
41    /**
42     * In order to utilize high water marks to tell writers to slow down, the
43     * provided stream must answer to the "hwm" stream metadata variable,
44     * providing the high water mark. If no "hwm" metadata value is available,
45     * then the "drain" functionality is not utilized.
46     *
47     * This class accepts an associative array of configuration options.
48     *
49     * - drain: (callable) Function to invoke when the stream has drained,
50     *   meaning the buffer is now writable again because the size of the
51     *   buffer is at an acceptable level (e.g., below the high water mark).
52     *   The function accepts a single argument, the buffer stream object that
53     *   has drained.
54     * - pump: (callable) A function that accepts the number of bytes to read
55     *   from the source stream. This function will block until all of the data
56     *   that was requested has been read, EOF of the source stream, or the
57     *   source stream is closed.
58     * - size: (int) The expected size in bytes of the data that will be read
59     *   (if known up-front).
60     *
61     * @param StreamInterface $buffer   Buffer that contains the data that has
62     *                                  been read by the event loop.
63     * @param array           $config   Associative array of options.
64     *
65     * @throws \InvalidArgumentException if the buffer is not readable and
66     *                                   writable.
67     */
68    public function __construct(
69        StreamInterface $buffer,
70        array $config = []
71    ) {
72        if (!$buffer->isReadable() || !$buffer->isWritable()) {
73            throw new \InvalidArgumentException(
74                'Buffer must be readable and writable'
75            );
76        }
77
78        if (isset($config['size'])) {
79            $this->size = $config['size'];
80        }
81
82        static $callables = ['pump', 'drain'];
83        foreach ($callables as $check) {
84            if (isset($config[$check])) {
85                if (!is_callable($config[$check])) {
86                    throw new \InvalidArgumentException(
87                        $check . ' must be callable'
88                    );
89                }
90                $this->{$check} = $config[$check];
91            }
92        }
93
94        $this->hwm = $buffer->getMetadata('hwm');
95
96        // Cannot drain when there's no high water mark.
97        if ($this->hwm === null) {
98            $this->drain = null;
99        }
100
101        $this->stream = $buffer;
102    }
103
104    /**
105     * Factory method used to create new async stream and an underlying buffer
106     * if no buffer is provided.
107     *
108     * This function accepts the same options as AsyncReadStream::__construct,
109     * but added the following key value pairs:
110     *
111     * - buffer: (StreamInterface) Buffer used to buffer data. If none is
112     *   provided, a default buffer is created.
113     * - hwm: (int) High water mark to use if a buffer is created on your
114     *   behalf.
115     * - max_buffer: (int) If provided, wraps the utilized buffer in a
116     *   DroppingStream decorator to ensure that buffer does not exceed a given
117     *   length. When exceeded, the stream will begin dropping data. Set the
118     *   max_buffer to 0, to use a NullStream which does not store data.
119     * - write: (callable) A function that is invoked when data is written
120     *   to the underlying buffer. The function accepts the buffer as the first
121     *   argument, and the data being written as the second. The function MUST
122     *   return the number of bytes that were written or false to let writers
123     *   know to slow down.
124     * - drain: (callable) See constructor documentation.
125     * - pump: (callable) See constructor documentation.
126     *
127     * @param array $options Associative array of options.
128     *
129     * @return array Returns an array containing the buffer used to buffer
130     *               data, followed by the ready to use AsyncReadStream object.
131     */
132    public static function create(array $options = [])
133    {
134        $maxBuffer = isset($options['max_buffer'])
135            ? $options['max_buffer']
136            : null;
137
138        if ($maxBuffer === 0) {
139            $buffer = new NullStream();
140        } elseif (isset($options['buffer'])) {
141            $buffer = $options['buffer'];
142        } else {
143            $hwm = isset($options['hwm']) ? $options['hwm'] : 16384;
144            $buffer = new BufferStream($hwm);
145        }
146
147        if ($maxBuffer > 0) {
148            $buffer = new DroppingStream($buffer, $options['max_buffer']);
149        }
150
151        // Call the on_write callback if an on_write function was provided.
152        if (isset($options['write'])) {
153            $onWrite = $options['write'];
154            $buffer = FnStream::decorate($buffer, [
155                'write' => function ($string) use ($buffer, $onWrite) {
156                    $result = $buffer->write($string);
157                    $onWrite($buffer, $string);
158                    return $result;
159                }
160            ]);
161        }
162
163        return [$buffer, new self($buffer, $options)];
164    }
165
166    public function getSize()
167    {
168        return $this->size;
169    }
170
171    public function isWritable()
172    {
173        return false;
174    }
175
176    public function write($string)
177    {
178        return false;
179    }
180
181    public function read($length)
182    {
183        if (!$this->needsDrain && $this->drain) {
184            $this->needsDrain = $this->stream->getSize() >= $this->hwm;
185        }
186
187        $result = $this->stream->read($length);
188
189        // If we need to drain, then drain when the buffer is empty.
190        if ($this->needsDrain && $this->stream->getSize() === 0) {
191            $this->needsDrain = false;
192            $drainFn = $this->drain;
193            $drainFn($this->stream);
194        }
195
196        $resultLen = strlen($result);
197
198        // If a pump was provided, the buffer is still open, and not enough
199        // data was given, then block until the data is provided.
200        if ($this->pump && $resultLen < $length) {
201            $pumpFn = $this->pump;
202            $result .= $pumpFn($length - $resultLen);
203        }
204
205        return $result;
206    }
207}
208