1<?php 2 3declare(strict_types = 1); 4 5namespace Elasticsearch\ConnectionPool; 6 7use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException; 8use Elasticsearch\Common\Exceptions\NoNodesAvailableException; 9use Elasticsearch\ConnectionPool\Selectors\SelectorInterface; 10use Elasticsearch\Connections\Connection; 11use Elasticsearch\Connections\ConnectionInterface; 12use Elasticsearch\Connections\ConnectionFactoryInterface; 13 14class SniffingConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface 15{ 16 /** 17 * @var int 18 */ 19 private $sniffingInterval = 300; 20 21 /** 22 * @var int 23 */ 24 private $nextSniff = -1; 25 26 /** 27 * {@inheritdoc} 28 */ 29 public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) 30 { 31 parent::__construct($connections, $selector, $factory, $connectionPoolParams); 32 33 $this->setConnectionPoolParams($connectionPoolParams); 34 $this->nextSniff = time() + $this->sniffingInterval; 35 } 36 37 public function nextConnection(bool $force = false): ConnectionInterface 38 { 39 $this->sniff($force); 40 41 $size = count($this->connections); 42 while ($size--) { 43 /** 44 * @var Connection $connection 45*/ 46 $connection = $this->selector->select($this->connections); 47 if ($connection->isAlive() === true || $connection->ping() === true) { 48 return $connection; 49 } 50 } 51 52 if ($force === true) { 53 throw new NoNodesAvailableException("No alive nodes found in your cluster"); 54 } 55 56 return $this->nextConnection(true); 57 } 58 59 public function scheduleCheck(): void 60 { 61 $this->nextSniff = -1; 62 } 63 64 private function sniff(bool $force = false) 65 { 66 if ($force === false && $this->nextSniff >= time()) { 67 return; 68 } 69 70 $total = count($this->connections); 71 72 while ($total--) { 73 /** 74 * @var Connection $connection 75*/ 76 $connection = $this->selector->select($this->connections); 77 78 if ($connection->isAlive() xor $force) { 79 continue; 80 } 81 82 if ($this->sniffConnection($connection) === true) { 83 return; 84 } 85 } 86 87 if ($force === true) { 88 return; 89 } 90 91 foreach ($this->seedConnections as $connection) { 92 if ($this->sniffConnection($connection) === true) { 93 return; 94 } 95 } 96 } 97 98 private function sniffConnection(Connection $connection): bool 99 { 100 try { 101 $response = $connection->sniff(); 102 } catch (OperationTimeoutException $exception) { 103 return false; 104 } 105 106 $nodes = $this->parseClusterState($connection->getTransportSchema(), $response); 107 108 if (count($nodes) === 0) { 109 return false; 110 } 111 112 $this->connections = array(); 113 114 foreach ($nodes as $node) { 115 $nodeDetails = array( 116 'host' => $node['host'], 117 'port' => $node['port'] 118 ); 119 $this->connections[] = $this->connectionFactory->create($nodeDetails); 120 } 121 122 $this->nextSniff = time() + $this->sniffingInterval; 123 124 return true; 125 } 126 127 private function parseClusterState(string $transportSchema, $nodeInfo): array 128 { 129 $pattern = '/([^:]*):([0-9]+)/'; 130 $schemaAddress = $transportSchema . '_address'; 131 $hosts = []; 132 133 foreach ($nodeInfo['nodes'] as $node) { 134 if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) { 135 if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) { 136 $hosts[] = array( 137 'host' => $match[1], 138 'port' => (int) $match[2], 139 ); 140 } 141 } 142 } 143 144 return $hosts; 145 } 146 147 private function setConnectionPoolParams(array $connectionPoolParams) 148 { 149 if (isset($connectionPoolParams['sniffingInterval']) === true) { 150 $this->sniffingInterval = $connectionPoolParams['sniffingInterval']; 151 } 152 } 153} 154