1<?php
2namespace GuzzleHttp\Tests\Stream;
3
4use GuzzleHttp\Stream\AsyncReadStream;
5use GuzzleHttp\Stream\BufferStream;
6use GuzzleHttp\Stream\FnStream;
7use GuzzleHttp\Stream\Stream;
8use InvalidArgumentException;
9use PHPUnit\Framework\TestCase;
10use ReflectionProperty;
11
12class AsyncReadStreamTest extends TestCase
13{
14    public function testValidatesReadableBuffer()
15    {
16        $this->expectException(InvalidArgumentException::class);
17        $this->expectErrorMessage('Buffer must be readable and writable');
18        new AsyncReadStream(FnStream::decorate(
19            Stream::factory(),
20            ['isReadable' => function () { return false; }]
21        ));
22    }
23
24    public function testValidatesWritableBuffer()
25    {
26        $this->expectException(InvalidArgumentException::class);
27        $this->expectErrorMessage('Buffer must be readable and writable');
28        new AsyncReadStream(FnStream::decorate(
29            Stream::factory(),
30            ['isWritable' => function () { return false; }]
31        ));
32    }
33
34    public function testValidatesHwmMetadata()
35    {
36        $a = new AsyncReadStream(Stream::factory(), [
37            'drain' => function() {}
38        ]);
39        $drain = new ReflectionProperty(AsyncReadStream::class, 'drain');
40        $drain->setAccessible(true);
41        $this->assertNull($drain->getValue($a));
42    }
43
44    public function testValidatesPumpIsCallable()
45    {
46        $this->expectException(InvalidArgumentException::class);
47        $this->expectErrorMessage('pump must be callable');
48        new AsyncReadStream(new BufferStream(), ['pump' => true]);
49    }
50
51    public function testValidatesDrainIsCallable()
52    {
53        $this->expectException(InvalidArgumentException::class);
54        $this->expectErrorMessage('drain must be callable');
55        new AsyncReadStream(new BufferStream(), ['drain' => true]);
56    }
57
58    public function testCanInitialize()
59    {
60        $buffer = new BufferStream();
61        $a = new AsyncReadStream($buffer, [
62            'size'  => 10,
63            'drain' => function () {},
64            'pump'  => function () {},
65        ]);
66        $size = new ReflectionProperty(AsyncReadStream::class, 'size');
67        $size->setAccessible(true);
68        $drain = new ReflectionProperty(AsyncReadStream::class, 'drain');
69        $drain->setAccessible(true);
70        $pump = new ReflectionProperty(AsyncReadStream::class, 'pump');
71        $pump->setAccessible(true);
72
73        $this->assertSame($buffer, $a->stream);
74        $this->assertTrue(is_callable($drain->getValue($a)));
75        $this->assertTrue(is_callable($pump->getValue($a)));
76        $this->assertTrue($a->isReadable());
77        $this->assertFalse($a->isSeekable());
78        $this->assertFalse($a->isWritable());
79        $this->assertFalse($a->write('foo'));
80        $this->assertEquals(10, $a->getSize());
81    }
82
83    public function testReadsFromBufferWithNoDrainOrPump()
84    {
85        $buffer = new BufferStream();
86        $a = new AsyncReadStream($buffer);
87        $buffer->write('foo');
88        $this->assertNull($a->getSize());
89        $this->assertEquals('foo', $a->read(10));
90        $this->assertEquals('', $a->read(10));
91    }
92
93    public function testCallsPumpForMoreDataWhenRequested()
94    {
95        $called = 0;
96        $buffer = new BufferStream();
97        $a = new AsyncReadStream($buffer, [
98            'pump' => function ($size) use (&$called) {
99                $called++;
100                return str_repeat('.', $size);
101            }
102        ]);
103        $buffer->write('foobar');
104        $this->assertEquals('foo', $a->read(3));
105        $this->assertEquals(0, $called);
106        $this->assertEquals('bar.....', $a->read(8));
107        $this->assertEquals(1, $called);
108        $this->assertEquals('..', $a->read(2));
109        $this->assertEquals(2, $called);
110    }
111
112    public function testCallsDrainWhenNeeded()
113    {
114        $called = 0;
115        $buffer = new BufferStream(5);
116        $a = new AsyncReadStream($buffer, [
117            'drain' => function (BufferStream $b) use (&$called, $buffer) {
118                $this->assertSame($b, $buffer);
119                $called++;
120            }
121        ]);
122
123        $buffer->write('foobar');
124        $this->assertEquals(6, $buffer->getSize());
125        $this->assertEquals(0, $called);
126
127        $needsDrain = new ReflectionProperty(AsyncReadStream::class, 'needsDrain');
128        $needsDrain->setAccessible(true);
129
130        $a->read(3);
131        $this->assertTrue($needsDrain->getValue($a));
132        $this->assertEquals(3, $buffer->getSize());
133        $this->assertEquals(0, $called);
134
135        $a->read(3);
136        $this->assertEquals(0, $buffer->getSize());
137        $this->assertFalse($needsDrain->getValue($a));
138        $this->assertEquals(1, $called);
139    }
140
141    public function testCreatesBufferWithNoConfig()
142    {
143        list($buffer, $async) = AsyncReadStream::create();
144        $this->assertInstanceOf('GuzzleHttp\Stream\BufferStream', $buffer);
145        $this->assertInstanceOf('GuzzleHttp\Stream\AsyncReadStream', $async);
146    }
147
148    public function testCreatesBufferWithSpecifiedBuffer()
149    {
150        $buf = new BufferStream();
151        list($buffer, $async) = AsyncReadStream::create(['buffer' => $buf]);
152        $this->assertSame($buf, $buffer);
153        $this->assertInstanceOf('GuzzleHttp\Stream\AsyncReadStream', $async);
154    }
155
156    public function testCreatesNullStream()
157    {
158        list($buffer, $async) = AsyncReadStream::create(['max_buffer' => 0]);
159        $this->assertInstanceOf('GuzzleHttp\Stream\NullStream', $buffer);
160        $this->assertInstanceOf('GuzzleHttp\Stream\AsyncReadStream', $async);
161    }
162
163    public function testCreatesDroppingStream()
164    {
165        list($buffer, $async) = AsyncReadStream::create(['max_buffer' => 5]);
166        $this->assertInstanceOf('GuzzleHttp\Stream\DroppingStream', $buffer);
167        $this->assertInstanceOf('GuzzleHttp\Stream\AsyncReadStream', $async);
168        $buffer->write('12345678910');
169        $this->assertEquals(5, $buffer->getSize());
170    }
171
172    public function testCreatesOnWriteStream()
173    {
174        $c = 0;
175        $b = new BufferStream();
176        list($buffer, $async) = AsyncReadStream::create([
177            'buffer' => $b,
178            'write'  => function (BufferStream $buf, $data) use (&$c, $b) {
179                $this->assertSame($buf, $b);
180                $this->assertEquals('foo', $data);
181                $c++;
182            },
183        ]);
184        $this->assertInstanceOf('GuzzleHttp\Stream\FnStream', $buffer);
185        $this->assertInstanceOf('GuzzleHttp\Stream\AsyncReadStream', $async);
186        $this->assertEquals(0, $c);
187        $this->assertEquals(3, $buffer->write('foo'));
188        $this->assertEquals(1, $c);
189        $this->assertEquals(3, $buffer->write('foo'));
190        $this->assertEquals(2, $c);
191        $this->assertEquals('foofoo', (string) $buffer);
192    }
193}
194