1<?php
2
3namespace Elastica;
4
5use Elastica\Query\AbstractQuery;
6use Elastica\Script\AbstractScript;
7use Elastica\Script\Script;
8
9class Reindex extends Param
10{
11    const VERSION_TYPE = 'version_type';
12    const VERSION_TYPE_INTERNAL = 'internal';
13    const VERSION_TYPE_EXTERNAL = 'external';
14    const OPERATION_TYPE = 'op_type';
15    const OPERATION_TYPE_CREATE = 'create';
16    const CONFLICTS = 'conflicts';
17    const CONFLICTS_PROCEED = 'proceed';
18    const TYPE = 'type';
19    const SIZE = 'size';
20    const QUERY = 'query';
21    const SORT = 'sort';
22    const SCRIPT = 'script';
23    const SOURCE = '_source';
24    const REMOTE = 'remote';
25    const SLICE = 'slice';
26    const REFRESH = 'refresh';
27    const WAIT_FOR_COMPLETION = 'wait_for_completion';
28    const WAIT_FOR_COMPLETION_FALSE = 'false';
29    const WAIT_FOR_ACTIVE_SHARDS = 'wait_for_active_shards';
30    const TIMEOUT = 'timeout';
31    const SCROLL = 'scroll';
32    const REQUESTS_PER_SECOND = 'requests_per_second';
33
34    /**
35     * @var Index
36     */
37    protected $_oldIndex;
38
39    /**
40     * @var Index
41     */
42    protected $_newIndex;
43
44    /**
45     * @var array
46     */
47    protected $_options;
48
49    /**
50     * @var Response|null
51     */
52    protected $_lastResponse;
53
54    public function __construct(Index $oldIndex, Index $newIndex, array $params = [])
55    {
56        $this->_oldIndex = $oldIndex;
57        $this->_newIndex = $newIndex;
58
59        $this->setParams($params);
60    }
61
62    public function run(): Response
63    {
64        $body = $this->_getBody($this->_oldIndex, $this->_newIndex, $this->getParams());
65
66        $reindexEndpoint = new \Elasticsearch\Endpoints\Reindex();
67        $params = \array_intersect_key($this->getParams(), \array_fill_keys($reindexEndpoint->getParamWhitelist(), null));
68        $reindexEndpoint->setParams($params);
69        $reindexEndpoint->setBody($body);
70
71        $this->_lastResponse = $this->_oldIndex->getClient()->requestEndpoint($reindexEndpoint);
72
73        return $this->_lastResponse;
74    }
75
76    protected function _getBody(Index $oldIndex, Index $newIndex, array $params): array
77    {
78        $body = \array_merge([
79            'source' => $this->_getSourcePartBody($oldIndex, $params),
80            'dest' => $this->_getDestPartBody($newIndex, $params),
81        ], $this->_resolveBodyOptions($params));
82
83        $body = $this->_setBodyScript($body);
84
85        return $body;
86    }
87
88    protected function _getSourcePartBody(Index $index, array $params): array
89    {
90        $sourceBody = \array_merge([
91            'index' => $index->getName(),
92        ], $this->_resolveSourceOptions($params));
93
94        $sourceBody = $this->_setSourceQuery($sourceBody);
95        $sourceBody = $this->_setSourceType($sourceBody);
96
97        return $sourceBody;
98    }
99
100    protected function _getDestPartBody(Index $index, array $params): array
101    {
102        $destBody = \array_merge([
103            'index' => $index->getName(),
104        ], $this->_resolveDestOptions($params));
105
106        return $destBody;
107    }
108
109    private function _resolveSourceOptions(array $params): array
110    {
111        return \array_intersect_key($params, [
112            self::TYPE => null,
113            self::QUERY => null,
114            self::SORT => null,
115            self::SOURCE => null,
116            self::REMOTE => null,
117            self::SLICE => null,
118        ]);
119    }
120
121    private function _resolveDestOptions(array $params): array
122    {
123        return \array_intersect_key($params, [
124            self::VERSION_TYPE => null,
125            self::OPERATION_TYPE => null,
126        ]);
127    }
128
129    private function _resolveBodyOptions(array $params): array
130    {
131        return \array_intersect_key($params, [
132            self::SIZE => null,
133            self::CONFLICTS => null,
134        ]);
135    }
136
137    private function _setSourceQuery(array $sourceBody): array
138    {
139        if (isset($sourceBody[self::QUERY]) && $sourceBody[self::QUERY] instanceof AbstractQuery) {
140            $sourceBody[self::QUERY] = $sourceBody[self::QUERY]->toArray();
141        }
142
143        return $sourceBody;
144    }
145
146    private function _setSourceType(array $sourceBody): array
147    {
148        if (isset($sourceBody[self::TYPE]) && !\is_array($sourceBody[self::TYPE])) {
149            $sourceBody[self::TYPE] = [$sourceBody[self::TYPE]];
150        }
151        if (isset($sourceBody[self::TYPE])) {
152            foreach ($sourceBody[self::TYPE] as $key => $type) {
153                if ($type instanceof Type) {
154                    $sourceBody[self::TYPE][$key] = $type->getName();
155                }
156            }
157        }
158
159        return $sourceBody;
160    }
161
162    private function _setBodyScript(array $body): array
163    {
164        if (!$this->hasParam(self::SCRIPT)) {
165            return $body;
166        }
167
168        $script = $this->getParam(self::SCRIPT);
169
170        if ($script instanceof AbstractScript) {
171            $body = \array_merge($body, $script->toArray());
172        } else {
173            $body[self::SCRIPT] = $script;
174        }
175
176        return $body;
177    }
178
179    public function setWaitForCompletion($value)
180    {
181        \is_bool($value) && $value = $value ? 'true' : 'false';
182
183        $this->setParam(self::WAIT_FOR_COMPLETION, $value);
184    }
185
186    public function setWaitForActiveShards($value)
187    {
188        $this->setParam(self::WAIT_FOR_ACTIVE_SHARDS, $value);
189    }
190
191    public function setTimeout($value)
192    {
193        $this->setParam(self::TIMEOUT, $value);
194    }
195
196    public function setScroll($value)
197    {
198        $this->setParam(self::SCROLL, $value);
199    }
200
201    public function setRequestsPerSecond($value)
202    {
203        $this->setParam(self::REQUESTS_PER_SECOND, $value);
204    }
205
206    public function setScript(Script $script)
207    {
208        $this->setParam(self::SCRIPT, $script);
209    }
210
211    public function getTaskId()
212    {
213        $taskId = null;
214        if ($this->_lastResponse instanceof Response) {
215            $taskId = $this->_lastResponse->getData()['task'] ? $this->_lastResponse->getData()['task'] : null;
216        }
217
218        return $taskId;
219    }
220}
221