1<?php 2 3namespace Elastica; 4 5use Elastica\Exception\ClientException; 6use Elastica\Exception\ConnectionException; 7use Elastica\Exception\ResponseException; 8use Elastica\Query\AbstractQuery; 9use Elastica\Script\AbstractScript; 10use Elastica\Script\Script; 11 12class Reindex extends Param 13{ 14 public const VERSION_TYPE = 'version_type'; 15 public const VERSION_TYPE_INTERNAL = 'internal'; 16 public const VERSION_TYPE_EXTERNAL = 'external'; 17 public const OPERATION_TYPE = 'op_type'; 18 public const OPERATION_TYPE_CREATE = 'create'; 19 public const CONFLICTS = 'conflicts'; 20 public const CONFLICTS_PROCEED = 'proceed'; 21 public const SIZE = 'size'; 22 public const QUERY = 'query'; 23 public const SORT = 'sort'; 24 public const SCRIPT = 'script'; 25 public const SOURCE = '_source'; 26 public const REMOTE = 'remote'; 27 public const SLICE = 'slice'; 28 public const REFRESH = 'refresh'; 29 public const REFRESH_TRUE = 'true'; 30 public const REFRESH_FALSE = 'false'; 31 public const REFRESH_WAIT_FOR = 'wait_for'; 32 public const WAIT_FOR_COMPLETION = 'wait_for_completion'; 33 34 /** 35 * @deprecated since version 7.2.0, use a boolean as parameter instead. 36 */ 37 public const WAIT_FOR_COMPLETION_FALSE = 'false'; 38 public const WAIT_FOR_ACTIVE_SHARDS = 'wait_for_active_shards'; 39 public const TIMEOUT = 'timeout'; 40 public const SCROLL = 'scroll'; 41 public const REQUESTS_PER_SECOND = 'requests_per_second'; 42 public const PIPELINE = 'pipeline'; 43 public const SLICES = 'slices'; 44 public const SLICES_AUTO = 'auto'; 45 46 /** 47 * @var Index 48 */ 49 protected $_oldIndex; 50 51 /** 52 * @var Index 53 */ 54 protected $_newIndex; 55 56 /** 57 * @var Response|null 58 */ 59 protected $_lastResponse; 60 61 public function __construct(Index $oldIndex, Index $newIndex, array $params = []) 62 { 63 $this->_oldIndex = $oldIndex; 64 $this->_newIndex = $newIndex; 65 66 $this->setParams($params); 67 } 68 69 /** 70 * @throws ClientException 71 * @throws ConnectionException 72 * @throws ResponseException 73 */ 74 public function run(): Response 75 { 76 $body = $this->_getBody($this->_oldIndex, $this->_newIndex, $this->getParams()); 77 78 $reindexEndpoint = new \Elasticsearch\Endpoints\Reindex(); 79 $params = \array_intersect_key($this->getParams(), \array_fill_keys($reindexEndpoint->getParamWhitelist(), null)); 80 $reindexEndpoint->setParams($params); 81 $reindexEndpoint->setBody($body); 82 83 $this->_lastResponse = $this->_oldIndex->getClient()->requestEndpoint($reindexEndpoint); 84 85 return $this->_lastResponse; 86 } 87 88 /** 89 * @param bool $value 90 */ 91 public function setWaitForCompletion($value): void 92 { 93 if (\is_bool($value)) { 94 $value = $value ? 'true' : 'false'; 95 } else { 96 \trigger_deprecation('ruflin/elastica', '7.2.0', 'Passing anything else than a boolean as 1st argument to "%s()" is deprecated, pass a boolean instead. It will be removed in 8.0.', __METHOD__); 97 } 98 99 $this->setParam(self::WAIT_FOR_COMPLETION, $value); 100 } 101 102 public function setWaitForActiveShards($value): void 103 { 104 $this->setParam(self::WAIT_FOR_ACTIVE_SHARDS, $value); 105 } 106 107 public function setTimeout($value): void 108 { 109 $this->setParam(self::TIMEOUT, $value); 110 } 111 112 public function setScroll($value): void 113 { 114 $this->setParam(self::SCROLL, $value); 115 } 116 117 public function setRequestsPerSecond($value): void 118 { 119 $this->setParam(self::REQUESTS_PER_SECOND, $value); 120 } 121 122 public function setScript(Script $script): void 123 { 124 $this->setParam(self::SCRIPT, $script); 125 } 126 127 public function setQuery(AbstractQuery $query): void 128 { 129 $this->setParam(self::QUERY, $query); 130 } 131 132 public function setPipeline(Pipeline $pipeline): void 133 { 134 $this->setParam(self::PIPELINE, $pipeline); 135 } 136 137 public function setRefresh(string $value): void 138 { 139 $this->setParam(self::REFRESH, $value); 140 } 141 142 public function getTaskId() 143 { 144 $taskId = null; 145 if ($this->_lastResponse instanceof Response) { 146 $taskId = $this->_lastResponse->getData()['task'] ?: null; 147 } 148 149 return $taskId; 150 } 151 152 protected function _getBody(Index $oldIndex, Index $newIndex, array $params): array 153 { 154 $body = \array_merge([ 155 'source' => $this->_getSourcePartBody($oldIndex, $params), 156 'dest' => $this->_getDestPartBody($newIndex, $params), 157 ], $this->_resolveBodyOptions($params)); 158 159 return $this->_setBodyScript($body); 160 } 161 162 protected function _getSourcePartBody(Index $index, array $params): array 163 { 164 $sourceBody = \array_merge([ 165 'index' => $index->getName(), 166 ], $this->_resolveSourceOptions($params)); 167 168 return $this->_setSourceQuery($sourceBody); 169 } 170 171 protected function _getDestPartBody(Index $index, array $params): array 172 { 173 $destBody = \array_merge([ 174 'index' => $index->getName(), 175 ], $this->_resolveDestOptions($params)); 176 177 // Resolves the pipeline name 178 $pipeline = $destBody[self::PIPELINE] ?? null; 179 if ($pipeline instanceof Pipeline) { 180 $destBody[self::PIPELINE] = $pipeline->getId(); 181 } 182 183 return $destBody; 184 } 185 186 private function _resolveSourceOptions(array $params): array 187 { 188 return \array_intersect_key($params, [ 189 self::QUERY => null, 190 self::SORT => null, 191 self::SOURCE => null, 192 self::REMOTE => null, 193 self::SLICE => null, 194 ]); 195 } 196 197 private function _resolveDestOptions(array $params): array 198 { 199 return \array_intersect_key($params, [ 200 self::VERSION_TYPE => null, 201 self::OPERATION_TYPE => null, 202 self::PIPELINE => null, 203 ]); 204 } 205 206 private function _resolveBodyOptions(array $params): array 207 { 208 return \array_intersect_key($params, [ 209 self::SIZE => null, 210 self::CONFLICTS => null, 211 ]); 212 } 213 214 private function _setSourceQuery(array $sourceBody): array 215 { 216 if (isset($sourceBody[self::QUERY]) && $sourceBody[self::QUERY] instanceof AbstractQuery) { 217 $sourceBody[self::QUERY] = $sourceBody[self::QUERY]->toArray(); 218 } 219 220 return $sourceBody; 221 } 222 223 private function _setBodyScript(array $body): array 224 { 225 if (!$this->hasParam(self::SCRIPT)) { 226 return $body; 227 } 228 229 $script = $this->getParam(self::SCRIPT); 230 231 if ($script instanceof AbstractScript) { 232 $body = \array_merge($body, $script->toArray()); 233 } else { 234 $body[self::SCRIPT] = $script; 235 } 236 237 return $body; 238 } 239} 240