1<?php
2
3namespace Elastica;
4
5use Elastica\Bulk\Action;
6use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction;
7use Elastica\Bulk\Response as BulkResponse;
8use Elastica\Bulk\ResponseSet;
9use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
10use Elastica\Exception\ClientException;
11use Elastica\Exception\ConnectionException;
12use Elastica\Exception\InvalidException;
13use Elastica\Exception\RequestEntityTooLargeException;
14use Elastica\Exception\ResponseException;
15use Elastica\Script\AbstractScript;
16
17class Bulk
18{
19    public const DELIMITER = "\n";
20
21    /**
22     * @var Client
23     */
24    protected $_client;
25
26    /**
27     * @var Action[]
28     */
29    protected $_actions = [];
30
31    /**
32     * @var string|null
33     */
34    protected $_index;
35
36    /**
37     * @var array request parameters to the bulk api
38     */
39    protected $_requestParams = [];
40
41    public function __construct(Client $client)
42    {
43        $this->_client = $client;
44    }
45
46    public function __toString(): string
47    {
48        $data = '';
49
50        foreach ($this->getActions() as $action) {
51            $data .= (string) $action;
52        }
53
54        return $data;
55    }
56
57    /**
58     * @param Index|string $index
59     *
60     * @return $this
61     */
62    public function setIndex($index): self
63    {
64        if ($index instanceof Index) {
65            $index = $index->getName();
66        }
67
68        $this->_index = (string) $index;
69
70        return $this;
71    }
72
73    /**
74     * @return string|null
75     */
76    public function getIndex()
77    {
78        return $this->_index;
79    }
80
81    public function hasIndex(): bool
82    {
83        return null !== $this->getIndex() && '' !== $this->getIndex();
84    }
85
86    public function getPath(): string
87    {
88        $path = '';
89        if ($this->hasIndex()) {
90            $path .= $this->getIndex().'/';
91        }
92        $path .= '_bulk';
93
94        return $path;
95    }
96
97    /**
98     * @return $this
99     */
100    public function addAction(Action $action): self
101    {
102        $this->_actions[] = $action;
103
104        return $this;
105    }
106
107    /**
108     * @param Action[] $actions
109     *
110     * @return $this
111     */
112    public function addActions(array $actions): self
113    {
114        foreach ($actions as $action) {
115            $this->addAction($action);
116        }
117
118        return $this;
119    }
120
121    /**
122     * @return Action[]
123     */
124    public function getActions(): array
125    {
126        return $this->_actions;
127    }
128
129    /**
130     * @return $this
131     */
132    public function addDocument(Document $document, ?string $opType = null): self
133    {
134        $action = AbstractDocumentAction::create($document, $opType);
135
136        return $this->addAction($action);
137    }
138
139    /**
140     * @param Document[] $documents
141     *
142     * @return $this
143     */
144    public function addDocuments(array $documents, ?string $opType = null): self
145    {
146        foreach ($documents as $document) {
147            $this->addDocument($document, $opType);
148        }
149
150        return $this;
151    }
152
153    /**
154     * @return $this
155     */
156    public function addScript(AbstractScript $script, ?string $opType = null): self
157    {
158        $action = AbstractDocumentAction::create($script, $opType);
159
160        return $this->addAction($action);
161    }
162
163    /**
164     * @param AbstractScript[] $scripts
165     * @param string|null      $opType
166     *
167     * @return $this
168     */
169    public function addScripts(array $scripts, $opType = null): self
170    {
171        foreach ($scripts as $script) {
172            $this->addScript($script, $opType);
173        }
174
175        return $this;
176    }
177
178    /**
179     * @param AbstractScript|array|Document $data
180     *
181     * @return $this
182     */
183    public function addData($data, ?string $opType = null)
184    {
185        if (!\is_array($data)) {
186            $data = [$data];
187        }
188
189        foreach ($data as $actionData) {
190            if ($actionData instanceof AbstractScript) {
191                $this->addScript($actionData, $opType);
192            } elseif ($actionData instanceof Document) {
193                $this->addDocument($actionData, $opType);
194            } else {
195                throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts');
196            }
197        }
198
199        return $this;
200    }
201
202    /**
203     * @throws InvalidException
204     *
205     * @return $this
206     */
207    public function addRawData(array $data): self
208    {
209        foreach ($data as $row) {
210            if (\is_array($row)) {
211                $opType = \key($row);
212                $metadata = \reset($row);
213                if (Action::isValidOpType($opType)) {
214                    // add previous action
215                    if (isset($action)) {
216                        $this->addAction($action);
217                    }
218                    $action = new Action($opType, $metadata);
219                } elseif (isset($action)) {
220                    $action->setSource($row);
221                    $this->addAction($action);
222                    $action = null;
223                } else {
224                    throw new InvalidException('Invalid bulk data, source must follow action metadata');
225                }
226            } else {
227                throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action');
228            }
229        }
230
231        // add last action if available
232        if (isset($action)) {
233            $this->addAction($action);
234        }
235
236        return $this;
237    }
238
239    /**
240     * Set a url parameter on the request bulk request.
241     *
242     * @param string $name  name of the parameter
243     * @param mixed  $value value of the parameter
244     *
245     * @return $this
246     */
247    public function setRequestParam(string $name, $value): self
248    {
249        $this->_requestParams[$name] = $value;
250
251        return $this;
252    }
253
254    /**
255     * Set the amount of time that the request will wait the shards to come on line.
256     * Requires Elasticsearch version >= 0.90.8.
257     *
258     * @param string $time timeout in Elasticsearch time format
259     *
260     * @return $this
261     */
262    public function setShardTimeout(string $time): self
263    {
264        return $this->setRequestParam('timeout', $time);
265    }
266
267    /**
268     * @deprecated since version 7.1.3, use the "__toString()" method or cast to string instead.
269     */
270    public function toString(): string
271    {
272        \trigger_deprecation('ruflin/elastica', '7.1.3', 'The "%s()" method is deprecated, use "__toString()" or cast to string instead. It will be removed in 8.0.', __METHOD__);
273
274        return (string) $this;
275    }
276
277    public function toArray(): array
278    {
279        $data = [];
280        foreach ($this->getActions() as $action) {
281            foreach ($action->toArray() as $row) {
282                $data[] = $row;
283            }
284        }
285
286        return $data;
287    }
288
289    /**
290     * @throws ClientException
291     * @throws ConnectionException
292     * @throws ResponseException
293     * @throws BulkResponseException
294     * @throws InvalidException
295     */
296    public function send(): ResponseSet
297    {
298        $response = $this->_client->request($this->getPath(), Request::POST, (string) $this, $this->_requestParams, Request::NDJSON_CONTENT_TYPE);
299
300        return $this->_processResponse($response);
301    }
302
303    /**
304     * @throws BulkResponseException
305     * @throws InvalidException
306     */
307    protected function _processResponse(Response $response): ResponseSet
308    {
309        switch ($response->getStatus()) {
310            case 413: throw new RequestEntityTooLargeException();
311        }
312
313        $responseData = $response->getData();
314
315        $actions = $this->getActions();
316
317        $bulkResponses = [];
318
319        if (isset($responseData['items']) && \is_array($responseData['items'])) {
320            foreach ($responseData['items'] as $key => $item) {
321                if (!isset($actions[$key])) {
322                    throw new InvalidException('No response found for action #'.$key);
323                }
324
325                $action = $actions[$key];
326
327                $opType = \key($item);
328                $bulkResponseData = \reset($item);
329
330                if ($action instanceof AbstractDocumentAction) {
331                    $data = $action->getData();
332                    if ($data instanceof Document && $data->isAutoPopulate()
333                        || $this->_client->getConfigValue(['document', 'autoPopulate'], false)
334                    ) {
335                        if (!$data->hasId() && isset($bulkResponseData['_id'])) {
336                            $data->setId($bulkResponseData['_id']);
337                        }
338                        $data->setVersionParams($bulkResponseData);
339                    }
340                }
341
342                $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType);
343            }
344        }
345
346        $bulkResponseSet = new ResponseSet($response, $bulkResponses);
347
348        if ($bulkResponseSet->hasError()) {
349            throw new BulkResponseException($bulkResponseSet);
350        }
351
352        return $bulkResponseSet;
353    }
354}
355