1<?php
2
3namespace Elastica;
4
5use Elastica\Bulk\Action;
6use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction;
7use Elastica\Bulk\Response as BulkResponse;
8use Elastica\Bulk\ResponseSet;
9use Elastica\Exception\Bulk\ResponseException;
10use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
11use Elastica\Exception\InvalidException;
12use Elastica\Script\AbstractScript;
13
14class Bulk
15{
16    const DELIMITER = "\n";
17
18    /**
19     * @var Client
20     */
21    protected $_client;
22
23    /**
24     * @var Action[]
25     */
26    protected $_actions = [];
27
28    /**
29     * @var string|null
30     */
31    protected $_index;
32
33    /**
34     * @var string|null
35     */
36    protected $_type;
37
38    /**
39     * @var array request parameters to the bulk api
40     */
41    protected $_requestParams = [];
42
43    /**
44     * @param Client $client
45     */
46    public function __construct(Client $client)
47    {
48        $this->_client = $client;
49    }
50
51    /**
52     * @param string|Index $index
53     *
54     * @return $this
55     */
56    public function setIndex($index): self
57    {
58        if ($index instanceof Index) {
59            $index = $index->getName();
60        }
61
62        $this->_index = (string) $index;
63
64        return $this;
65    }
66
67    /**
68     * @return string|null
69     */
70    public function getIndex()
71    {
72        return $this->_index;
73    }
74
75    /**
76     * @return bool
77     */
78    public function hasIndex(): bool
79    {
80        return null !== $this->getIndex() && '' !== $this->getIndex();
81    }
82
83    /**
84     * @param string|Type $type
85     *
86     * @return $this
87     */
88    public function setType($type): self
89    {
90        if ($type instanceof Type) {
91            $this->setIndex($type->getIndex()->getName());
92            $type = $type->getName();
93        }
94
95        $this->_type = (string) $type;
96
97        return $this;
98    }
99
100    /**
101     * @return string|null
102     */
103    public function getType()
104    {
105        return $this->_type;
106    }
107
108    /**
109     * @return bool
110     */
111    public function hasType(): bool
112    {
113        return null !== $this->getType() && '' !== $this->getType();
114    }
115
116    /**
117     * @return string
118     */
119    public function getPath(): string
120    {
121        $path = '';
122        if ($this->hasIndex()) {
123            $path .= $this->getIndex().'/';
124            if ($this->hasType()) {
125                $path .= $this->getType().'/';
126            }
127        }
128        $path .= '_bulk';
129
130        return $path;
131    }
132
133    /**
134     * @param Action $action
135     *
136     * @return $this
137     */
138    public function addAction(Action $action): self
139    {
140        $this->_actions[] = $action;
141
142        return $this;
143    }
144
145    /**
146     * @param Action[] $actions
147     *
148     * @return $this
149     */
150    public function addActions(array $actions): self
151    {
152        foreach ($actions as $action) {
153            $this->addAction($action);
154        }
155
156        return $this;
157    }
158
159    /**
160     * @return Action[]
161     */
162    public function getActions(): array
163    {
164        return $this->_actions;
165    }
166
167    /**
168     * @param Document $document
169     * @param string   $opType
170     *
171     * @return $this
172     */
173    public function addDocument(Document $document, string $opType = null): self
174    {
175        $action = AbstractDocumentAction::create($document, $opType);
176
177        return $this->addAction($action);
178    }
179
180    /**
181     * @param Document[] $documents
182     * @param string     $opType
183     *
184     * @return $this
185     */
186    public function addDocuments(array $documents, string $opType = null): self
187    {
188        foreach ($documents as $document) {
189            $this->addDocument($document, $opType);
190        }
191
192        return $this;
193    }
194
195    /**
196     * @param AbstractScript $script
197     * @param string         $opType
198     *
199     * @return $this
200     */
201    public function addScript(AbstractScript $script, string $opType = null): self
202    {
203        $action = AbstractDocumentAction::create($script, $opType);
204
205        return $this->addAction($action);
206    }
207
208    /**
209     * @param Document[] $scripts
210     * @param string     $opType
211     *
212     * @return $this
213     */
214    public function addScripts(array $scripts, $opType = null): self
215    {
216        foreach ($scripts as $document) {
217            $this->addScript($document, $opType);
218        }
219
220        return $this;
221    }
222
223    /**
224     * @param \Elastica\Script\AbstractScript|\Elastica\Document|array $data
225     * @param string                                                   $opType
226     *
227     * @return $this
228     */
229    public function addData($data, string $opType = null)
230    {
231        if (!\is_array($data)) {
232            $data = [$data];
233        }
234
235        foreach ($data as $actionData) {
236            if ($actionData instanceof AbstractScript) {
237                $this->addScript($actionData, $opType);
238            } elseif ($actionData instanceof Document) {
239                $this->addDocument($actionData, $opType);
240            } else {
241                throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts');
242            }
243        }
244
245        return $this;
246    }
247
248    /**
249     * @param array $data
250     *
251     * @throws InvalidException
252     *
253     * @return $this
254     */
255    public function addRawData(array $data): self
256    {
257        foreach ($data as $row) {
258            if (\is_array($row)) {
259                $opType = \key($row);
260                $metadata = \reset($row);
261                if (Action::isValidOpType($opType)) {
262                    // add previous action
263                    if (isset($action)) {
264                        $this->addAction($action);
265                    }
266                    $action = new Action($opType, $metadata);
267                } elseif (isset($action)) {
268                    $action->setSource($row);
269                    $this->addAction($action);
270                    $action = null;
271                } else {
272                    throw new InvalidException('Invalid bulk data, source must follow action metadata');
273                }
274            } else {
275                throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action');
276            }
277        }
278
279        // add last action if available
280        if (isset($action)) {
281            $this->addAction($action);
282        }
283
284        return $this;
285    }
286
287    /**
288     * Set a url parameter on the request bulk request.
289     *
290     * @param string $name  name of the parameter
291     * @param mixed  $value value of the parameter
292     *
293     * @return $this
294     */
295    public function setRequestParam(string $name, $value): self
296    {
297        $this->_requestParams[$name] = $value;
298
299        return $this;
300    }
301
302    /**
303     * Set the amount of time that the request will wait the shards to come on line.
304     * Requires Elasticsearch version >= 0.90.8.
305     *
306     * @param string $time timeout in Elasticsearch time format
307     *
308     * @return $this
309     */
310    public function setShardTimeout(string $time): self
311    {
312        return $this->setRequestParam('timeout', $time);
313    }
314
315    /**
316     * @return string
317     */
318    public function __toString(): string
319    {
320        return $this->toString();
321    }
322
323    /**
324     * @return string
325     */
326    public function toString(): string
327    {
328        $data = '';
329        foreach ($this->getActions() as $action) {
330            $data .= $action->toString();
331        }
332
333        return $data;
334    }
335
336    /**
337     * @return array
338     */
339    public function toArray(): array
340    {
341        $data = [];
342        foreach ($this->getActions() as $action) {
343            foreach ($action->toArray() as $row) {
344                $data[] = $row;
345            }
346        }
347
348        return $data;
349    }
350
351    /**
352     * @return ResponseSet
353     */
354    public function send(): ResponseSet
355    {
356        $path = $this->getPath();
357        $data = $this->toString();
358
359        $response = $this->_client->request($path, Request::POST, $data, $this->_requestParams, Request::NDJSON_CONTENT_TYPE);
360
361        return $this->_processResponse($response);
362    }
363
364    /**
365     * @param Response $response
366     *
367     * @throws ResponseException
368     * @throws InvalidException
369     *
370     * @return ResponseSet
371     */
372    protected function _processResponse(Response $response): ResponseSet
373    {
374        $responseData = $response->getData();
375
376        $actions = $this->getActions();
377
378        $bulkResponses = [];
379
380        if (isset($responseData['items']) && \is_array($responseData['items'])) {
381            foreach ($responseData['items'] as $key => $item) {
382                if (!isset($actions[$key])) {
383                    throw new InvalidException('No response found for action #'.$key);
384                }
385
386                $action = $actions[$key];
387
388                $opType = \key($item);
389                $bulkResponseData = \reset($item);
390
391                if ($action instanceof AbstractDocumentAction) {
392                    $data = $action->getData();
393                    if ($data instanceof Document && $data->isAutoPopulate()
394                        || $this->_client->getConfigValue(['document', 'autoPopulate'], false)
395                    ) {
396                        if (!$data->hasId() && isset($bulkResponseData['_id'])) {
397                            $data->setId($bulkResponseData['_id']);
398                        }
399                        if (isset($bulkResponseData['_version'])) {
400                            $data->setVersion($bulkResponseData['_version']);
401                        }
402                    }
403                }
404
405                $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType);
406            }
407        }
408
409        $bulkResponseSet = new ResponseSet($response, $bulkResponses);
410
411        if ($bulkResponseSet->hasError()) {
412            throw new BulkResponseException($bulkResponseSet);
413        }
414
415        return $bulkResponseSet;
416    }
417}
418