1<?php 2/** 3 * Elasticsearch PHP client 4 * 5 * @link https://github.com/elastic/elasticsearch-php/ 6 * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co) 7 * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 8 * @license https://www.gnu.org/licenses/lgpl-2.1.html GNU Lesser General Public License, Version 2.1 9 * 10 * Licensed to Elasticsearch B.V under one or more agreements. 11 * Elasticsearch B.V licenses this file to you under the Apache 2.0 License or 12 * the GNU Lesser General Public License, Version 2.1, at your option. 13 * See the LICENSE file in the project root for more information. 14 */ 15 16 17declare(strict_types = 1); 18 19namespace Elasticsearch\ConnectionPool; 20 21use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException; 22use Elasticsearch\Common\Exceptions\NoNodesAvailableException; 23use Elasticsearch\ConnectionPool\Selectors\SelectorInterface; 24use Elasticsearch\Connections\Connection; 25use Elasticsearch\Connections\ConnectionInterface; 26use Elasticsearch\Connections\ConnectionFactoryInterface; 27 28class SniffingConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface 29{ 30 /** 31 * @var int 32 */ 33 private $sniffingInterval = 300; 34 35 /** 36 * @var int 37 */ 38 private $nextSniff = -1; 39 40 /** 41 * {@inheritdoc} 42 */ 43 public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) 44 { 45 parent::__construct($connections, $selector, $factory, $connectionPoolParams); 46 47 $this->setConnectionPoolParams($connectionPoolParams); 48 $this->nextSniff = time() + $this->sniffingInterval; 49 } 50 51 public function nextConnection(bool $force = false): ConnectionInterface 52 { 53 $this->sniff($force); 54 55 $size = count($this->connections); 56 while ($size--) { 57 /** 58 * @var Connection $connection 59*/ 60 $connection = $this->selector->select($this->connections); 61 if ($connection->isAlive() === true || $connection->ping() === true) { 62 return $connection; 63 } 64 } 65 66 if ($force === true) { 67 throw new NoNodesAvailableException("No alive nodes found in your cluster"); 68 } 69 70 return $this->nextConnection(true); 71 } 72 73 public function scheduleCheck(): void 74 { 75 $this->nextSniff = -1; 76 } 77 78 private function sniff(bool $force = false) 79 { 80 if ($force === false && $this->nextSniff >= time()) { 81 return; 82 } 83 84 $total = count($this->connections); 85 86 while ($total--) { 87 /** 88 * @var Connection $connection 89*/ 90 $connection = $this->selector->select($this->connections); 91 92 if ($connection->isAlive() xor $force) { 93 continue; 94 } 95 96 if ($this->sniffConnection($connection) === true) { 97 return; 98 } 99 } 100 101 if ($force === true) { 102 return; 103 } 104 105 foreach ($this->seedConnections as $connection) { 106 if ($this->sniffConnection($connection) === true) { 107 return; 108 } 109 } 110 } 111 112 private function sniffConnection(Connection $connection): bool 113 { 114 try { 115 $response = $connection->sniff(); 116 } catch (OperationTimeoutException $exception) { 117 return false; 118 } 119 120 $nodes = $this->parseClusterState($connection->getTransportSchema(), $response); 121 122 if (count($nodes) === 0) { 123 return false; 124 } 125 126 $this->connections = array(); 127 128 foreach ($nodes as $node) { 129 $nodeDetails = array( 130 'host' => $node['host'], 131 'port' => $node['port'] 132 ); 133 $this->connections[] = $this->connectionFactory->create($nodeDetails); 134 } 135 136 $this->nextSniff = time() + $this->sniffingInterval; 137 138 return true; 139 } 140 141 private function parseClusterState(string $transportSchema, $nodeInfo): array 142 { 143 $pattern = '/([^:]*):([0-9]+)/'; 144 $schemaAddress = $transportSchema . '_address'; 145 $hosts = []; 146 147 foreach ($nodeInfo['nodes'] as $node) { 148 if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) { 149 if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) { 150 $hosts[] = array( 151 'host' => $match[1], 152 'port' => (int) $match[2], 153 ); 154 } 155 } 156 } 157 158 return $hosts; 159 } 160 161 private function setConnectionPoolParams(array $connectionPoolParams) 162 { 163 if (isset($connectionPoolParams['sniffingInterval']) === true) { 164 $this->sniffingInterval = $connectionPoolParams['sniffingInterval']; 165 } 166 } 167} 168