1<?php 2 3namespace Elastica; 4 5use Elastica\Query\AbstractQuery; 6use Elastica\Script\AbstractScript; 7use Elastica\Script\Script; 8 9class Reindex extends Param 10{ 11 const VERSION_TYPE = 'version_type'; 12 const VERSION_TYPE_INTERNAL = 'internal'; 13 const VERSION_TYPE_EXTERNAL = 'external'; 14 const OPERATION_TYPE = 'op_type'; 15 const OPERATION_TYPE_CREATE = 'create'; 16 const CONFLICTS = 'conflicts'; 17 const CONFLICTS_PROCEED = 'proceed'; 18 const TYPE = 'type'; 19 const SIZE = 'size'; 20 const QUERY = 'query'; 21 const SORT = 'sort'; 22 const SCRIPT = 'script'; 23 const SOURCE = '_source'; 24 const REMOTE = 'remote'; 25 const SLICE = 'slice'; 26 const REFRESH = 'refresh'; 27 const WAIT_FOR_COMPLETION = 'wait_for_completion'; 28 const WAIT_FOR_COMPLETION_FALSE = 'false'; 29 const WAIT_FOR_ACTIVE_SHARDS = 'wait_for_active_shards'; 30 const TIMEOUT = 'timeout'; 31 const SCROLL = 'scroll'; 32 const REQUESTS_PER_SECOND = 'requests_per_second'; 33 34 /** 35 * @var Index 36 */ 37 protected $_oldIndex; 38 39 /** 40 * @var Index 41 */ 42 protected $_newIndex; 43 44 /** 45 * @var array 46 */ 47 protected $_options; 48 49 /** 50 * @var Response|null 51 */ 52 protected $_lastResponse; 53 54 public function __construct(Index $oldIndex, Index $newIndex, array $params = []) 55 { 56 $this->_oldIndex = $oldIndex; 57 $this->_newIndex = $newIndex; 58 59 $this->setParams($params); 60 } 61 62 public function run(): Response 63 { 64 $body = $this->_getBody($this->_oldIndex, $this->_newIndex, $this->getParams()); 65 66 $reindexEndpoint = new \Elasticsearch\Endpoints\Reindex(); 67 $params = \array_intersect_key($this->getParams(), \array_fill_keys($reindexEndpoint->getParamWhitelist(), null)); 68 $reindexEndpoint->setParams($params); 69 $reindexEndpoint->setBody($body); 70 71 $this->_lastResponse = $this->_oldIndex->getClient()->requestEndpoint($reindexEndpoint); 72 73 return $this->_lastResponse; 74 } 75 76 protected function _getBody(Index $oldIndex, Index $newIndex, array $params): array 77 { 78 $body = \array_merge([ 79 'source' => $this->_getSourcePartBody($oldIndex, $params), 80 'dest' => $this->_getDestPartBody($newIndex, $params), 81 ], $this->_resolveBodyOptions($params)); 82 83 $body = $this->_setBodyScript($body); 84 85 return $body; 86 } 87 88 protected function _getSourcePartBody(Index $index, array $params): array 89 { 90 $sourceBody = \array_merge([ 91 'index' => $index->getName(), 92 ], $this->_resolveSourceOptions($params)); 93 94 $sourceBody = $this->_setSourceQuery($sourceBody); 95 $sourceBody = $this->_setSourceType($sourceBody); 96 97 return $sourceBody; 98 } 99 100 protected function _getDestPartBody(Index $index, array $params): array 101 { 102 $destBody = \array_merge([ 103 'index' => $index->getName(), 104 ], $this->_resolveDestOptions($params)); 105 106 return $destBody; 107 } 108 109 private function _resolveSourceOptions(array $params): array 110 { 111 return \array_intersect_key($params, [ 112 self::TYPE => null, 113 self::QUERY => null, 114 self::SORT => null, 115 self::SOURCE => null, 116 self::REMOTE => null, 117 self::SLICE => null, 118 ]); 119 } 120 121 private function _resolveDestOptions(array $params): array 122 { 123 return \array_intersect_key($params, [ 124 self::VERSION_TYPE => null, 125 self::OPERATION_TYPE => null, 126 ]); 127 } 128 129 private function _resolveBodyOptions(array $params): array 130 { 131 return \array_intersect_key($params, [ 132 self::SIZE => null, 133 self::CONFLICTS => null, 134 ]); 135 } 136 137 private function _setSourceQuery(array $sourceBody): array 138 { 139 if (isset($sourceBody[self::QUERY]) && $sourceBody[self::QUERY] instanceof AbstractQuery) { 140 $sourceBody[self::QUERY] = $sourceBody[self::QUERY]->toArray(); 141 } 142 143 return $sourceBody; 144 } 145 146 private function _setSourceType(array $sourceBody): array 147 { 148 if (isset($sourceBody[self::TYPE]) && !\is_array($sourceBody[self::TYPE])) { 149 $sourceBody[self::TYPE] = [$sourceBody[self::TYPE]]; 150 } 151 if (isset($sourceBody[self::TYPE])) { 152 foreach ($sourceBody[self::TYPE] as $key => $type) { 153 if ($type instanceof Type) { 154 $sourceBody[self::TYPE][$key] = $type->getName(); 155 } 156 } 157 } 158 159 return $sourceBody; 160 } 161 162 private function _setBodyScript(array $body): array 163 { 164 if (!$this->hasParam(self::SCRIPT)) { 165 return $body; 166 } 167 168 $script = $this->getParam(self::SCRIPT); 169 170 if ($script instanceof AbstractScript) { 171 $body = \array_merge($body, $script->toArray()); 172 } else { 173 $body[self::SCRIPT] = $script; 174 } 175 176 return $body; 177 } 178 179 public function setWaitForCompletion($value) 180 { 181 \is_bool($value) && $value = $value ? 'true' : 'false'; 182 183 $this->setParam(self::WAIT_FOR_COMPLETION, $value); 184 } 185 186 public function setWaitForActiveShards($value) 187 { 188 $this->setParam(self::WAIT_FOR_ACTIVE_SHARDS, $value); 189 } 190 191 public function setTimeout($value) 192 { 193 $this->setParam(self::TIMEOUT, $value); 194 } 195 196 public function setScroll($value) 197 { 198 $this->setParam(self::SCROLL, $value); 199 } 200 201 public function setRequestsPerSecond($value) 202 { 203 $this->setParam(self::REQUESTS_PER_SECOND, $value); 204 } 205 206 public function setScript(Script $script) 207 { 208 $this->setParam(self::SCRIPT, $script); 209 } 210 211 public function getTaskId() 212 { 213 $taskId = null; 214 if ($this->_lastResponse instanceof Response) { 215 $taskId = $this->_lastResponse->getData()['task'] ? $this->_lastResponse->getData()['task'] : null; 216 } 217 218 return $taskId; 219 } 220} 221