1<?php
2
3declare(strict_types = 1);
4
5namespace Elasticsearch\Tests\ConnectionPool;
6
7use Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector;
8use Elasticsearch\ConnectionPool\SniffingConnectionPool;
9use Elasticsearch\Connections\Connection;
10use Elasticsearch\Connections\ConnectionFactory;
11use Mockery as m;
12use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException;
13
14/**
15 * Class SniffingConnectionPoolTest
16 *
17 * @category   Tests
18 * @package    Elasticsearch
19 * @subpackage Tests/SniffingConnectionPoolTest
20 * @author     Zachary Tong <zachary.tong@elasticsearch.com>
21 * @license    http://www.apache.org/licenses/LICENSE-2.0 Apache2
22 * @link       http://elasticsearch.org
23 */
24class SniffingConnectionPoolTest extends \PHPUnit\Framework\TestCase
25{
26    protected function setUp()
27    {
28        static::markTestSkipped("All of Sniffing unit tests use outdated cluster state format, need to redo");
29    }
30
31
32    public function tearDown()
33    {
34        m::close();
35    }
36
37    public function testAddOneHostThenGetConnection()
38    {
39        $mockConnection = m::mock(Connection::class)
40            ->shouldReceive('ping')
41            ->andReturn(true)
42            ->getMock()
43            ->shouldReceive('isAlive')
44            ->andReturn(true)
45            ->getMock();
46
47        /**
48 * @var \Elasticsearch\Connections\Connection[]&\Mockery\MockInterface[] $connections
49*/
50        $connections = [$mockConnection];
51
52        $selector = m::mock(RoundRobinSelector::class)
53            ->shouldReceive('select')
54            ->andReturn($connections[0])
55            ->getMock();
56
57        $connectionFactory = m::mock(ConnectionFactory::class);
58
59        $connectionPoolParams = ['randomizeHosts' => false];
60        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
61
62        $retConnection = $connectionPool->nextConnection();
63
64        $this->assertSame($mockConnection, $retConnection);
65    }
66
67    public function testAddOneHostAndTriggerSniff()
68    {
69        $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"Bl2ihSr7TcuUHxhu1GA_YQ":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}}}', true);
70
71        $mockConnection = m::mock(Connection::class)
72            ->shouldReceive('ping')->andReturn(true)->getMock()
73            ->shouldReceive('isAlive')->andReturn(true)->getMock()
74            ->shouldReceive('getTransportSchema')->once()->andReturn('http')->getMock()
75            ->shouldReceive('sniff')->once()->andReturn($clusterState)->getMock();
76
77        /**
78 * @var \Elasticsearch\Connections\Connection[]&\Mockery\MockInterface[] $connections
79*/
80        $connections = [$mockConnection];
81        $mockNewConnection = m::mock(Connection::class)
82            ->shouldReceive('isAlive')->andReturn(true)->getMock();
83
84        $selector = m::mock(RoundRobinSelector::class)
85            ->shouldReceive('select')->twice()
86            ->andReturn($mockNewConnection)
87            ->getMock();
88
89        $connectionFactory = m::mock(ConnectionFactory::class)
90            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturn($mockNewConnection)->getMock();
91
92        $connectionPoolParams = [
93            'randomizeHosts' => false,
94            'sniffingInterval'  => -1
95        ];
96        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
97
98        $retConnection = $connectionPool->nextConnection();
99
100        $this->assertSame($mockNewConnection, $retConnection);
101    }
102
103    public function testAddOneHostAndForceNext()
104    {
105        $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"Bl2ihSr7TcuUHxhu1GA_YQ":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}}}', true);
106
107        $mockConnection = m::mock(Connection::class)
108            ->shouldReceive('ping')->andReturn(true)->getMock()
109            ->shouldReceive('isAlive')->andReturn(true)->getMock()
110            ->shouldReceive('getTransportSchema')->once()->andReturn('http')->getMock()
111            ->shouldReceive('sniff')->once()->andReturn($clusterState)->getMock();
112
113        /**
114 * @var \Elasticsearch\Connections\Connection[]&\Mockery\MockInterface[] $connections
115*/
116        $connections = [$mockConnection];
117        $mockNewConnection = m::mock(Connection::class)
118            ->shouldReceive('isAlive')->andReturn(true)->getMock();
119
120        $selector = m::mock(RoundRobinSelector::class)
121            ->shouldReceive('select')->once()->andReturn($mockConnection)->getMock()
122            ->shouldReceive('select')->once()->andReturn($mockNewConnection)->getMock();
123
124        $connectionFactory = m::mock(ConnectionFactory::class)
125            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturn($mockNewConnection)->getMock();
126
127        $connectionPoolParams = [
128            'randomizeHosts' => false
129        ];
130        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
131
132        $retConnection = $connectionPool->nextConnection(true);
133
134        $this->assertSame($mockNewConnection, $retConnection);
135    }
136
137    public function testAddTenNodesThenGetConnection()
138    {
139        $connections = [];
140
141        foreach (range(1, 10) as $index) {
142            $mockConnection = m::mock(Connection::class)
143                ->shouldReceive('ping')
144                ->andReturn(true)
145                ->getMock()
146                ->shouldReceive('isAlive')
147                ->andReturn(true)
148                ->getMock();
149
150            $connections[] = $mockConnection;
151        }
152
153        $selector = m::mock(RoundRobinSelector::class)
154            ->shouldReceive('select')
155            ->andReturn($connections[0])
156            ->getMock();
157
158        $connectionFactory = m::mock(ConnectionFactory::class);
159
160        $connectionPoolParams = ['randomizeHosts' => false];
161        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
162
163        $retConnection = $connectionPool->nextConnection();
164
165        $this->assertSame($connections[0], $retConnection);
166    }
167
168    public function testAddTenNodesTimeoutAllButLast()
169    {
170        $connections = [];
171
172        foreach (range(1, 9) as $index) {
173            $mockConnection = m::mock(Connection::class)
174                ->shouldReceive('ping')
175                ->andReturn(false)
176                ->getMock()
177                ->shouldReceive('isAlive')
178                ->andReturn(false)
179                ->getMock();
180
181            $connections[] = $mockConnection;
182        }
183
184        $mockConnection = m::mock(Connection::class)
185            ->shouldReceive('ping')
186            ->andReturn(true)
187            ->getMock()
188            ->shouldReceive('isAlive')
189            ->andReturn(true)
190            ->getMock();
191
192        $connections[] = $mockConnection;
193
194        $selector = m::mock(RoundRobinSelector::class)
195            ->shouldReceive('select')
196            ->andReturnValues($connections)
197            ->getMock();
198
199        $connectionFactory = m::mock(ConnectionFactory::class);
200
201        $connectionPoolParams = ['randomizeHosts' => false];
202        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
203
204        $retConnection = $connectionPool->nextConnection();
205
206        $this->assertSame($connections[9], $retConnection);
207    }
208
209    public function testAddTenNodesAllTimeout()
210    {
211        $connections = [];
212
213        foreach (range(1, 10) as $index) {
214            $mockConnection = m::mock(Connection::class)
215                ->shouldReceive('ping')
216                ->andReturn(false)
217                ->getMock()
218                ->shouldReceive('isAlive')
219                ->andReturn(false)
220                ->getMock();
221
222            $connections[] = $mockConnection;
223        }
224
225        $selector = m::mock(RoundRobinSelector::class)
226            ->shouldReceive('select')
227            ->andReturnValues($connections)
228            ->getMock();
229
230        $connectionFactory = m::mock(ConnectionFactory::class);
231
232        $connectionPoolParams = ['randomizeHosts' => false];
233        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
234
235        $this->expectException(\Elasticsearch\Common\Exceptions\NoNodesAvailableException::class);
236        $this->expectExceptionMessage('No alive nodes found in your cluster');
237
238        $retConnection = $connectionPool->nextConnection();
239    }
240
241    public function testAddOneHostSniffTwo()
242    {
243        $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true);
244
245        $mockConnection = m::mock(Connection::class)
246            ->shouldReceive('ping')->andReturn(true)->getMock()
247            ->shouldReceive('isAlive')->andReturn(true)->getMock()
248            ->shouldReceive('getTransportSchema')->twice()->andReturn('http')->getMock()
249            ->shouldReceive('sniff')->twice()->andReturn($clusterState)->getMock();
250
251        /**
252 * @var \Elasticsearch\Connections\Connection[]&\Mockery\MockInterface[] $connections
253*/
254        $connections = [$mockConnection];
255
256        $newConnections = [];
257        $newConnections[] = m::mock(Connection::class)
258            ->shouldReceive('isAlive')->andReturn(true)->getMock();
259
260        $newConnections[] = m::mock(Connection::class)
261            ->shouldReceive('isAlive')->andReturn(true)->getMock();
262
263        $selector = m::mock(RoundRobinSelector::class)
264            ->shouldReceive('select')
265            ->andReturnValues(
266                [        //selects provided node first, then the new cluster list
267                            $mockConnection,
268                            $newConnections[0],
269                            $newConnections[1]
270                        ]
271            )
272            ->getMock();
273
274        $connectionFactory = m::mock(ConnectionFactory::class)
275            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturn($newConnections[0])->getMock()
276            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturn($newConnections[1])->getMock();
277
278        $connectionPoolParams = [
279            'randomizeHosts' => false,
280            'sniffingInterval'  => -1
281        ];
282        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
283
284        $retConnection = $connectionPool->nextConnection();
285        $this->assertSame($newConnections[0], $retConnection);
286
287        $retConnection = $connectionPool->nextConnection();
288        $this->assertSame($newConnections[1], $retConnection);
289    }
290
291    public function testAddSeedSniffTwoTimeoutTwo()
292    {
293        $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true);
294
295        $mockConnection = m::mock(Connection::class)
296            ->shouldReceive('ping')->andReturn(true)->getMock()
297            ->shouldReceive('isAlive')->andReturn(true)->getMock()
298            ->shouldReceive('getTransportSchema')->once()->andReturn('http')->getMock()
299            ->shouldReceive('sniff')->once()->andReturn($clusterState)->getMock();
300
301        /**
302         * @var \Elasticsearch\Connections\Connection[]&\Mockery\MockInterface[] $connections
303         */
304        $connections = [$mockConnection];
305
306        $newConnections = [];
307        $newConnections[] = m::mock(Connection::class)
308            ->shouldReceive('isAlive')->andReturn(false)->getMock()
309            ->shouldReceive('ping')->andReturn(false)->getMock();
310
311        $newConnections[] = m::mock(Connection::class)
312            ->shouldReceive('isAlive')->andReturn(false)->getMock()
313            ->shouldReceive('ping')->andReturn(false)->getMock();
314
315        $selector = m::mock(RoundRobinSelector::class)
316            ->shouldReceive('select')
317            ->andReturnValues(
318                [        //selects provided node first, then the new cluster list
319                        $mockConnection,
320                        $newConnections[0],
321                        $newConnections[1]
322                        ]
323            )
324            ->getMock();
325
326        $connectionFactory = m::mock(ConnectionFactory::class)
327            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturn($newConnections[0])->getMock()
328            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturn($newConnections[1])->getMock();
329
330        $connectionPoolParams = [
331            'randomizeHosts' => false,
332            'sniffingInterval'  => -1
333        ];
334        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
335
336        $this->expectException(\Elasticsearch\Common\Exceptions\NoNodesAvailableException::class);
337        $this->expectExceptionMessage('No alive nodes found in your cluster');
338
339        $retConnection = $connectionPool->nextConnection();
340    }
341
342    public function testTenTimeoutNineSniffTenthAddTwoAlive()
343    {
344        $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true);
345
346        $connections = [];
347
348        foreach (range(1, 10) as $index) {
349            $mockConnection = m::mock(Connection::class)
350                ->shouldReceive('ping')->andReturn(false)->getMock()
351                ->shouldReceive('isAlive')->andReturn(true)->getMock()
352                ->shouldReceive('sniff')->andThrow(OperationTimeoutException::class)->getMock();
353
354            $connections[] = $mockConnection;
355        }
356
357        $mockConnection = m::mock(Connection::class)
358            ->shouldReceive('ping')->andReturn(true)->getMock()
359            ->shouldReceive('isAlive')->andReturn(true)->getMock()
360            ->shouldReceive('sniff')->andReturn($clusterState)->getMock()
361            ->shouldReceive('getTransportSchema')->twice()->andReturn('http')->getMock();
362
363        $connections[] = $mockConnection;
364
365        $newConnections = $connections;
366        $newConnections[] = m::mock(Connection::class)
367            ->shouldReceive('isAlive')->andReturn(true)->getMock()
368            ->shouldReceive('ping')->andReturn(true)->getMock();
369
370        $newConnections[] = m::mock(Connection::class)
371            ->shouldReceive('isAlive')->andReturn(true)->getMock()
372            ->shouldReceive('ping')->andReturn(true)->getMock();
373
374        $selector = m::mock(RoundRobinSelector::class)
375            ->shouldReceive('select')
376            ->andReturnValues($newConnections)
377            ->getMock();
378
379        $connectionFactory = m::mock(ConnectionFactory::class)
380            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturn($newConnections[10])->getMock()
381            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturn($newConnections[11])->getMock();
382
383        $connectionPoolParams = [
384            'randomizeHosts' => false,
385            'sniffingInterval'  => -1
386        ];
387        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
388
389        $retConnection = $connectionPool->nextConnection();
390        $this->assertSame($newConnections[11], $retConnection);
391
392        $retConnection = $connectionPool->nextConnection();
393        $this->assertSame($newConnections[12], $retConnection);
394    }
395
396    public function testTenTimeoutNineSniffTenthAddTwoDeadTimeoutEveryone()
397    {
398        $clusterState = json_decode('{"ok":true,"cluster_name":"elasticsearch_zach","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true);
399
400        $connections = [];
401
402        foreach (range(1, 10) as $index) {
403            $mockConnection = m::mock(Connection::class)
404                ->shouldReceive('ping')->andReturn(false)->getMock()
405                ->shouldReceive('isAlive')->andReturn(true)->getMock()
406                ->shouldReceive('sniff')->andThrow(OperationTimeoutException::class)->getMock();
407
408            $connections[] = $mockConnection;
409        }
410
411        $mockConnection = m::mock(Connection::class)
412            ->shouldReceive('ping')->andReturn(true)->getMock()
413            ->shouldReceive('isAlive')->andReturn(true)->getMock()
414            ->shouldReceive('sniff')->andReturn($clusterState)->getMock()
415            ->shouldReceive('getTransportSchema')->once()->andReturn('http')->getMock()
416            ->shouldReceive('sniff')->andThrow(OperationTimeoutException::class)->getMock();
417
418        $connections[] = $mockConnection;
419
420        $newConnections = $connections;
421        $newConnections[] = m::mock(Connection::class)
422            ->shouldReceive('isAlive')->andReturn(false)->getMock()
423            ->shouldReceive('ping')->andReturn(false)->getMock()
424            ->shouldReceive('sniff')->andThrow(OperationTimeoutException::class)->getMock();
425
426        $newConnections[] = m::mock(Connection::class)
427            ->shouldReceive('isAlive')->andReturn(false)->getMock()
428            ->shouldReceive('ping')->andReturn(false)->getMock()
429            ->shouldReceive('sniff')->andThrow(OperationTimeoutException::class)->getMock();
430
431        $selector = m::mock(RoundRobinSelector::class)
432            ->shouldReceive('select')
433            ->andReturnValues($newConnections)
434            ->getMock();
435
436        $connectionFactory = m::mock(ConnectionFactory::class)
437            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturn($newConnections[10])->getMock()
438            ->shouldReceive('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturn($newConnections[11])->getMock();
439
440        $connectionPoolParams = [
441            'randomizeHosts' => false,
442            'sniffingInterval'  => -1
443        ];
444        $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams);
445
446        $this->expectException(\Elasticsearch\Common\Exceptions\NoNodesAvailableException::class);
447        $this->expectExceptionMessage('No alive nodes found in your cluster');
448
449        $retConnection = $connectionPool->nextConnection();
450    }
451}
452