1<?php
2/**
3 * Elasticsearch PHP client
4 *
5 * @link      https://github.com/elastic/elasticsearch-php/
6 * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
7 * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
8 * @license   https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
9 *
10 * Licensed to Elasticsearch B.V under one or more agreements.
11 * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
12 * the GNU Lesser General Public License, Version 2.1, at your option.
13 * See the LICENSE file in the project root for more information.
14 */
15
16
17declare(strict_types = 1);
18
19namespace Elasticsearch\ConnectionPool;
20
21use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException;
22use Elasticsearch\Common\Exceptions\NoNodesAvailableException;
23use Elasticsearch\ConnectionPool\Selectors\SelectorInterface;
24use Elasticsearch\Connections\Connection;
25use Elasticsearch\Connections\ConnectionInterface;
26use Elasticsearch\Connections\ConnectionFactoryInterface;
27
28class SniffingConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface
29{
30    /**
31     * @var int
32     */
33    private $sniffingInterval = 300;
34
35    /**
36     * @var int
37     */
38    private $nextSniff = -1;
39
40    /**
41     * {@inheritdoc}
42     */
43    public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams)
44    {
45        parent::__construct($connections, $selector, $factory, $connectionPoolParams);
46
47        $this->setConnectionPoolParams($connectionPoolParams);
48        $this->nextSniff = time() + $this->sniffingInterval;
49    }
50
51    public function nextConnection(bool $force = false): ConnectionInterface
52    {
53        $this->sniff($force);
54
55        $size = count($this->connections);
56        while ($size--) {
57            /**
58 * @var Connection $connection
59*/
60            $connection = $this->selector->select($this->connections);
61            if ($connection->isAlive() === true || $connection->ping() === true) {
62                return $connection;
63            }
64        }
65
66        if ($force === true) {
67            throw new NoNodesAvailableException("No alive nodes found in your cluster");
68        }
69
70        return $this->nextConnection(true);
71    }
72
73    public function scheduleCheck(): void
74    {
75        $this->nextSniff = -1;
76    }
77
78    private function sniff(bool $force = false)
79    {
80        if ($force === false && $this->nextSniff >= time()) {
81            return;
82        }
83
84        $total = count($this->connections);
85
86        while ($total--) {
87            /**
88 * @var Connection $connection
89*/
90            $connection = $this->selector->select($this->connections);
91
92            if ($connection->isAlive() xor $force) {
93                continue;
94            }
95
96            if ($this->sniffConnection($connection) === true) {
97                return;
98            }
99        }
100
101        if ($force === true) {
102            return;
103        }
104
105        foreach ($this->seedConnections as $connection) {
106            if ($this->sniffConnection($connection) === true) {
107                return;
108            }
109        }
110    }
111
112    private function sniffConnection(Connection $connection): bool
113    {
114        try {
115            $response = $connection->sniff();
116        } catch (OperationTimeoutException $exception) {
117            return false;
118        }
119
120        $nodes = $this->parseClusterState($connection->getTransportSchema(), $response);
121
122        if (count($nodes) === 0) {
123            return false;
124        }
125
126        $this->connections = array();
127
128        foreach ($nodes as $node) {
129            $nodeDetails = array(
130                'host' => $node['host'],
131                'port' => $node['port']
132            );
133            $this->connections[] = $this->connectionFactory->create($nodeDetails);
134        }
135
136        $this->nextSniff = time() + $this->sniffingInterval;
137
138        return true;
139    }
140
141    private function parseClusterState(string $transportSchema, $nodeInfo): array
142    {
143        $pattern       = '/([^:]*):([0-9]+)/';
144        $schemaAddress = $transportSchema . '_address';
145        $hosts         = [];
146
147        foreach ($nodeInfo['nodes'] as $node) {
148            if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) {
149                if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) {
150                    $hosts[] = array(
151                        'host' => $match[1],
152                        'port' => (int) $match[2],
153                    );
154                }
155            }
156        }
157
158        return $hosts;
159    }
160
161    private function setConnectionPoolParams(array $connectionPoolParams)
162    {
163        if (isset($connectionPoolParams['sniffingInterval']) === true) {
164            $this->sniffingInterval = $connectionPoolParams['sniffingInterval'];
165        }
166    }
167}
168