1<?php
2/**
3 * Elasticsearch PHP client
4 *
5 * @link      https://github.com/elastic/elasticsearch-php/
6 * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
7 * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
8 * @license   https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1
9 *
10 * Licensed to Elasticsearch B.V under one or more agreements.
11 * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or
12 * the GNU Lesser General Public License, Version 2.1, at your option.
13 * See the LICENSE file in the project root for more information.
14 */
15
16
17declare(strict_types = 1);
18
19namespace Elasticsearch\ConnectionPool;
20
21use Elasticsearch\Common\Exceptions\NoNodesAvailableException;
22use Elasticsearch\ConnectionPool\Selectors\SelectorInterface;
23use Elasticsearch\Connections\Connection;
24use Elasticsearch\Connections\ConnectionInterface;
25use Elasticsearch\Connections\ConnectionFactoryInterface;
26
27class StaticConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface
28{
29    /**
30     * @var int
31     */
32    private $pingTimeout    = 60;
33
34    /**
35     * @var int
36     */
37    private $maxPingTimeout = 3600;
38
39    /**
40     * {@inheritdoc}
41     */
42    public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams)
43    {
44        parent::__construct($connections, $selector, $factory, $connectionPoolParams);
45        $this->scheduleCheck();
46    }
47
48    public function nextConnection(bool $force = false): ConnectionInterface
49    {
50        $skipped = [];
51
52        $total = count($this->connections);
53        while ($total--) {
54            /**
55 * @var Connection $connection
56*/
57            $connection = $this->selector->select($this->connections);
58            if ($connection->isAlive() === true) {
59                return $connection;
60            }
61
62            if ($this->readyToRevive($connection) === true) {
63                if ($connection->ping() === true) {
64                    return $connection;
65                }
66            } else {
67                $skipped[] = $connection;
68            }
69        }
70
71        // All "alive" nodes failed, force pings on "dead" nodes
72        foreach ($skipped as $connection) {
73            if ($connection->ping() === true) {
74                return $connection;
75            }
76        }
77
78        throw new NoNodesAvailableException("No alive nodes found in your cluster");
79    }
80
81    public function scheduleCheck(): void
82    {
83        foreach ($this->connections as $connection) {
84            $connection->markDead();
85        }
86    }
87
88    private function readyToRevive(Connection $connection): bool
89    {
90        $timeout = min(
91            $this->pingTimeout * pow(2, $connection->getPingFailures()),
92            $this->maxPingTimeout
93        );
94
95        if ($connection->getLastPing() + $timeout < time()) {
96            return true;
97        } else {
98            return false;
99        }
100    }
101}
102