1<?php
2
3declare(strict_types = 1);
4
5namespace Elasticsearch\ConnectionPool;
6
7use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException;
8use Elasticsearch\Common\Exceptions\NoNodesAvailableException;
9use Elasticsearch\ConnectionPool\Selectors\SelectorInterface;
10use Elasticsearch\Connections\Connection;
11use Elasticsearch\Connections\ConnectionInterface;
12use Elasticsearch\Connections\ConnectionFactoryInterface;
13
14class SniffingConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface
15{
16    /**
17     * @var int
18     */
19    private $sniffingInterval = 300;
20
21    /**
22     * @var int
23     */
24    private $nextSniff = -1;
25
26    /**
27     * {@inheritdoc}
28     */
29    public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams)
30    {
31        parent::__construct($connections, $selector, $factory, $connectionPoolParams);
32
33        $this->setConnectionPoolParams($connectionPoolParams);
34        $this->nextSniff = time() + $this->sniffingInterval;
35    }
36
37    public function nextConnection(bool $force = false): ConnectionInterface
38    {
39        $this->sniff($force);
40
41        $size = count($this->connections);
42        while ($size--) {
43            /**
44 * @var Connection $connection
45*/
46            $connection = $this->selector->select($this->connections);
47            if ($connection->isAlive() === true || $connection->ping() === true) {
48                return $connection;
49            }
50        }
51
52        if ($force === true) {
53            throw new NoNodesAvailableException("No alive nodes found in your cluster");
54        }
55
56        return $this->nextConnection(true);
57    }
58
59    public function scheduleCheck(): void
60    {
61        $this->nextSniff = -1;
62    }
63
64    private function sniff(bool $force = false)
65    {
66        if ($force === false && $this->nextSniff >= time()) {
67            return;
68        }
69
70        $total = count($this->connections);
71
72        while ($total--) {
73            /**
74 * @var Connection $connection
75*/
76            $connection = $this->selector->select($this->connections);
77
78            if ($connection->isAlive() xor $force) {
79                continue;
80            }
81
82            if ($this->sniffConnection($connection) === true) {
83                return;
84            }
85        }
86
87        if ($force === true) {
88            return;
89        }
90
91        foreach ($this->seedConnections as $connection) {
92            if ($this->sniffConnection($connection) === true) {
93                return;
94            }
95        }
96    }
97
98    private function sniffConnection(Connection $connection): bool
99    {
100        try {
101            $response = $connection->sniff();
102        } catch (OperationTimeoutException $exception) {
103            return false;
104        }
105
106        $nodes = $this->parseClusterState($connection->getTransportSchema(), $response);
107
108        if (count($nodes) === 0) {
109            return false;
110        }
111
112        $this->connections = array();
113
114        foreach ($nodes as $node) {
115            $nodeDetails = array(
116                'host' => $node['host'],
117                'port' => $node['port']
118            );
119            $this->connections[] = $this->connectionFactory->create($nodeDetails);
120        }
121
122        $this->nextSniff = time() + $this->sniffingInterval;
123
124        return true;
125    }
126
127    private function parseClusterState(string $transportSchema, $nodeInfo): array
128    {
129        $pattern       = '/([^:]*):([0-9]+)/';
130        $schemaAddress = $transportSchema . '_address';
131        $hosts         = [];
132
133        foreach ($nodeInfo['nodes'] as $node) {
134            if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) {
135                if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) {
136                    $hosts[] = array(
137                        'host' => $match[1],
138                        'port' => (int) $match[2],
139                    );
140                }
141            }
142        }
143
144        return $hosts;
145    }
146
147    private function setConnectionPoolParams(array $connectionPoolParams)
148    {
149        if (isset($connectionPoolParams['sniffingInterval']) === true) {
150            $this->sniffingInterval = $connectionPoolParams['sniffingInterval'];
151        }
152    }
153}
154