1<?php
2
3namespace Elastica;
4
5use Elastica\Exception\ClientException;
6use Elastica\Exception\ConnectionException;
7use Elastica\Exception\ResponseException;
8use Elastica\Query\AbstractQuery;
9use Elastica\Script\AbstractScript;
10use Elastica\Script\Script;
11
12class Reindex extends Param
13{
14    public const VERSION_TYPE = 'version_type';
15    public const VERSION_TYPE_INTERNAL = 'internal';
16    public const VERSION_TYPE_EXTERNAL = 'external';
17    public const OPERATION_TYPE = 'op_type';
18    public const OPERATION_TYPE_CREATE = 'create';
19    public const CONFLICTS = 'conflicts';
20    public const CONFLICTS_PROCEED = 'proceed';
21    public const SIZE = 'size';
22    public const QUERY = 'query';
23    public const SORT = 'sort';
24    public const SCRIPT = 'script';
25    public const SOURCE = '_source';
26    public const REMOTE = 'remote';
27    public const SLICE = 'slice';
28    public const REFRESH = 'refresh';
29    public const REFRESH_TRUE = 'true';
30    public const REFRESH_FALSE = 'false';
31    public const REFRESH_WAIT_FOR = 'wait_for';
32    public const WAIT_FOR_COMPLETION = 'wait_for_completion';
33
34    /**
35     * @deprecated since version 7.2.0, use a boolean as parameter instead.
36     */
37    public const WAIT_FOR_COMPLETION_FALSE = 'false';
38    public const WAIT_FOR_ACTIVE_SHARDS = 'wait_for_active_shards';
39    public const TIMEOUT = 'timeout';
40    public const SCROLL = 'scroll';
41    public const REQUESTS_PER_SECOND = 'requests_per_second';
42    public const PIPELINE = 'pipeline';
43    public const SLICES = 'slices';
44    public const SLICES_AUTO = 'auto';
45
46    /**
47     * @var Index
48     */
49    protected $_oldIndex;
50
51    /**
52     * @var Index
53     */
54    protected $_newIndex;
55
56    /**
57     * @var Response|null
58     */
59    protected $_lastResponse;
60
61    public function __construct(Index $oldIndex, Index $newIndex, array $params = [])
62    {
63        $this->_oldIndex = $oldIndex;
64        $this->_newIndex = $newIndex;
65
66        $this->setParams($params);
67    }
68
69    /**
70     * @throws ClientException
71     * @throws ConnectionException
72     * @throws ResponseException
73     */
74    public function run(): Response
75    {
76        $body = $this->_getBody($this->_oldIndex, $this->_newIndex, $this->getParams());
77
78        $reindexEndpoint = new \Elasticsearch\Endpoints\Reindex();
79        $params = \array_intersect_key($this->getParams(), \array_fill_keys($reindexEndpoint->getParamWhitelist(), null));
80        $reindexEndpoint->setParams($params);
81        $reindexEndpoint->setBody($body);
82
83        $this->_lastResponse = $this->_oldIndex->getClient()->requestEndpoint($reindexEndpoint);
84
85        return $this->_lastResponse;
86    }
87
88    /**
89     * @param bool $value
90     */
91    public function setWaitForCompletion($value): void
92    {
93        if (\is_bool($value)) {
94            $value = $value ? 'true' : 'false';
95        } else {
96            \trigger_deprecation('ruflin/elastica', '7.2.0', 'Passing anything else than a boolean as 1st argument to "%s()" is deprecated, pass a boolean instead. It will be removed in 8.0.', __METHOD__);
97        }
98
99        $this->setParam(self::WAIT_FOR_COMPLETION, $value);
100    }
101
102    public function setWaitForActiveShards($value): void
103    {
104        $this->setParam(self::WAIT_FOR_ACTIVE_SHARDS, $value);
105    }
106
107    public function setTimeout($value): void
108    {
109        $this->setParam(self::TIMEOUT, $value);
110    }
111
112    public function setScroll($value): void
113    {
114        $this->setParam(self::SCROLL, $value);
115    }
116
117    public function setRequestsPerSecond($value): void
118    {
119        $this->setParam(self::REQUESTS_PER_SECOND, $value);
120    }
121
122    public function setScript(Script $script): void
123    {
124        $this->setParam(self::SCRIPT, $script);
125    }
126
127    public function setQuery(AbstractQuery $query): void
128    {
129        $this->setParam(self::QUERY, $query);
130    }
131
132    public function setPipeline(Pipeline $pipeline): void
133    {
134        $this->setParam(self::PIPELINE, $pipeline);
135    }
136
137    public function setRefresh(string $value): void
138    {
139        $this->setParam(self::REFRESH, $value);
140    }
141
142    public function getTaskId()
143    {
144        $taskId = null;
145        if ($this->_lastResponse instanceof Response) {
146            $taskId = $this->_lastResponse->getData()['task'] ?: null;
147        }
148
149        return $taskId;
150    }
151
152    protected function _getBody(Index $oldIndex, Index $newIndex, array $params): array
153    {
154        $body = \array_merge([
155            'source' => $this->_getSourcePartBody($oldIndex, $params),
156            'dest' => $this->_getDestPartBody($newIndex, $params),
157        ], $this->_resolveBodyOptions($params));
158
159        return $this->_setBodyScript($body);
160    }
161
162    protected function _getSourcePartBody(Index $index, array $params): array
163    {
164        $sourceBody = \array_merge([
165            'index' => $index->getName(),
166        ], $this->_resolveSourceOptions($params));
167
168        return $this->_setSourceQuery($sourceBody);
169    }
170
171    protected function _getDestPartBody(Index $index, array $params): array
172    {
173        $destBody = \array_merge([
174            'index' => $index->getName(),
175        ], $this->_resolveDestOptions($params));
176
177        // Resolves the pipeline name
178        $pipeline = $destBody[self::PIPELINE] ?? null;
179        if ($pipeline instanceof Pipeline) {
180            $destBody[self::PIPELINE] = $pipeline->getId();
181        }
182
183        return $destBody;
184    }
185
186    private function _resolveSourceOptions(array $params): array
187    {
188        return \array_intersect_key($params, [
189            self::QUERY => null,
190            self::SORT => null,
191            self::SOURCE => null,
192            self::REMOTE => null,
193            self::SLICE => null,
194        ]);
195    }
196
197    private function _resolveDestOptions(array $params): array
198    {
199        return \array_intersect_key($params, [
200            self::VERSION_TYPE => null,
201            self::OPERATION_TYPE => null,
202            self::PIPELINE => null,
203        ]);
204    }
205
206    private function _resolveBodyOptions(array $params): array
207    {
208        return \array_intersect_key($params, [
209            self::SIZE => null,
210            self::CONFLICTS => null,
211        ]);
212    }
213
214    private function _setSourceQuery(array $sourceBody): array
215    {
216        if (isset($sourceBody[self::QUERY]) && $sourceBody[self::QUERY] instanceof AbstractQuery) {
217            $sourceBody[self::QUERY] = $sourceBody[self::QUERY]->toArray();
218        }
219
220        return $sourceBody;
221    }
222
223    private function _setBodyScript(array $body): array
224    {
225        if (!$this->hasParam(self::SCRIPT)) {
226            return $body;
227        }
228
229        $script = $this->getParam(self::SCRIPT);
230
231        if ($script instanceof AbstractScript) {
232            $body = \array_merge($body, $script->toArray());
233        } else {
234            $body[self::SCRIPT] = $script;
235        }
236
237        return $body;
238    }
239}
240