1<?php 2 3namespace Elastica; 4 5use Elastica\Bulk\Action; 6use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction; 7use Elastica\Bulk\Response as BulkResponse; 8use Elastica\Bulk\ResponseSet; 9use Elastica\Exception\Bulk\ResponseException; 10use Elastica\Exception\Bulk\ResponseException as BulkResponseException; 11use Elastica\Exception\InvalidException; 12use Elastica\Script\AbstractScript; 13 14class Bulk 15{ 16 const DELIMITER = "\n"; 17 18 /** 19 * @var Client 20 */ 21 protected $_client; 22 23 /** 24 * @var Action[] 25 */ 26 protected $_actions = []; 27 28 /** 29 * @var string|null 30 */ 31 protected $_index; 32 33 /** 34 * @var string|null 35 */ 36 protected $_type; 37 38 /** 39 * @var array request parameters to the bulk api 40 */ 41 protected $_requestParams = []; 42 43 /** 44 * @param Client $client 45 */ 46 public function __construct(Client $client) 47 { 48 $this->_client = $client; 49 } 50 51 /** 52 * @param string|Index $index 53 * 54 * @return $this 55 */ 56 public function setIndex($index): self 57 { 58 if ($index instanceof Index) { 59 $index = $index->getName(); 60 } 61 62 $this->_index = (string) $index; 63 64 return $this; 65 } 66 67 /** 68 * @return string|null 69 */ 70 public function getIndex() 71 { 72 return $this->_index; 73 } 74 75 /** 76 * @return bool 77 */ 78 public function hasIndex(): bool 79 { 80 return null !== $this->getIndex() && '' !== $this->getIndex(); 81 } 82 83 /** 84 * @param string|Type $type 85 * 86 * @return $this 87 */ 88 public function setType($type): self 89 { 90 if ($type instanceof Type) { 91 $this->setIndex($type->getIndex()->getName()); 92 $type = $type->getName(); 93 } 94 95 $this->_type = (string) $type; 96 97 return $this; 98 } 99 100 /** 101 * @return string|null 102 */ 103 public function getType() 104 { 105 return $this->_type; 106 } 107 108 /** 109 * @return bool 110 */ 111 public function hasType(): bool 112 { 113 return null !== $this->getType() && '' !== $this->getType(); 114 } 115 116 /** 117 * @return string 118 */ 119 public function getPath(): string 120 { 121 $path = ''; 122 if ($this->hasIndex()) { 123 $path .= $this->getIndex().'/'; 124 if ($this->hasType()) { 125 $path .= $this->getType().'/'; 126 } 127 } 128 $path .= '_bulk'; 129 130 return $path; 131 } 132 133 /** 134 * @param Action $action 135 * 136 * @return $this 137 */ 138 public function addAction(Action $action): self 139 { 140 $this->_actions[] = $action; 141 142 return $this; 143 } 144 145 /** 146 * @param Action[] $actions 147 * 148 * @return $this 149 */ 150 public function addActions(array $actions): self 151 { 152 foreach ($actions as $action) { 153 $this->addAction($action); 154 } 155 156 return $this; 157 } 158 159 /** 160 * @return Action[] 161 */ 162 public function getActions(): array 163 { 164 return $this->_actions; 165 } 166 167 /** 168 * @param Document $document 169 * @param string $opType 170 * 171 * @return $this 172 */ 173 public function addDocument(Document $document, string $opType = null): self 174 { 175 $action = AbstractDocumentAction::create($document, $opType); 176 177 return $this->addAction($action); 178 } 179 180 /** 181 * @param Document[] $documents 182 * @param string $opType 183 * 184 * @return $this 185 */ 186 public function addDocuments(array $documents, string $opType = null): self 187 { 188 foreach ($documents as $document) { 189 $this->addDocument($document, $opType); 190 } 191 192 return $this; 193 } 194 195 /** 196 * @param AbstractScript $script 197 * @param string $opType 198 * 199 * @return $this 200 */ 201 public function addScript(AbstractScript $script, string $opType = null): self 202 { 203 $action = AbstractDocumentAction::create($script, $opType); 204 205 return $this->addAction($action); 206 } 207 208 /** 209 * @param Document[] $scripts 210 * @param string $opType 211 * 212 * @return $this 213 */ 214 public function addScripts(array $scripts, $opType = null): self 215 { 216 foreach ($scripts as $document) { 217 $this->addScript($document, $opType); 218 } 219 220 return $this; 221 } 222 223 /** 224 * @param \Elastica\Script\AbstractScript|\Elastica\Document|array $data 225 * @param string $opType 226 * 227 * @return $this 228 */ 229 public function addData($data, string $opType = null) 230 { 231 if (!\is_array($data)) { 232 $data = [$data]; 233 } 234 235 foreach ($data as $actionData) { 236 if ($actionData instanceof AbstractScript) { 237 $this->addScript($actionData, $opType); 238 } elseif ($actionData instanceof Document) { 239 $this->addDocument($actionData, $opType); 240 } else { 241 throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts'); 242 } 243 } 244 245 return $this; 246 } 247 248 /** 249 * @param array $data 250 * 251 * @throws InvalidException 252 * 253 * @return $this 254 */ 255 public function addRawData(array $data): self 256 { 257 foreach ($data as $row) { 258 if (\is_array($row)) { 259 $opType = \key($row); 260 $metadata = \reset($row); 261 if (Action::isValidOpType($opType)) { 262 // add previous action 263 if (isset($action)) { 264 $this->addAction($action); 265 } 266 $action = new Action($opType, $metadata); 267 } elseif (isset($action)) { 268 $action->setSource($row); 269 $this->addAction($action); 270 $action = null; 271 } else { 272 throw new InvalidException('Invalid bulk data, source must follow action metadata'); 273 } 274 } else { 275 throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action'); 276 } 277 } 278 279 // add last action if available 280 if (isset($action)) { 281 $this->addAction($action); 282 } 283 284 return $this; 285 } 286 287 /** 288 * Set a url parameter on the request bulk request. 289 * 290 * @param string $name name of the parameter 291 * @param mixed $value value of the parameter 292 * 293 * @return $this 294 */ 295 public function setRequestParam(string $name, $value): self 296 { 297 $this->_requestParams[$name] = $value; 298 299 return $this; 300 } 301 302 /** 303 * Set the amount of time that the request will wait the shards to come on line. 304 * Requires Elasticsearch version >= 0.90.8. 305 * 306 * @param string $time timeout in Elasticsearch time format 307 * 308 * @return $this 309 */ 310 public function setShardTimeout(string $time): self 311 { 312 return $this->setRequestParam('timeout', $time); 313 } 314 315 /** 316 * @return string 317 */ 318 public function __toString(): string 319 { 320 return $this->toString(); 321 } 322 323 /** 324 * @return string 325 */ 326 public function toString(): string 327 { 328 $data = ''; 329 foreach ($this->getActions() as $action) { 330 $data .= $action->toString(); 331 } 332 333 return $data; 334 } 335 336 /** 337 * @return array 338 */ 339 public function toArray(): array 340 { 341 $data = []; 342 foreach ($this->getActions() as $action) { 343 foreach ($action->toArray() as $row) { 344 $data[] = $row; 345 } 346 } 347 348 return $data; 349 } 350 351 /** 352 * @return ResponseSet 353 */ 354 public function send(): ResponseSet 355 { 356 $path = $this->getPath(); 357 $data = $this->toString(); 358 359 $response = $this->_client->request($path, Request::POST, $data, $this->_requestParams, Request::NDJSON_CONTENT_TYPE); 360 361 return $this->_processResponse($response); 362 } 363 364 /** 365 * @param Response $response 366 * 367 * @throws ResponseException 368 * @throws InvalidException 369 * 370 * @return ResponseSet 371 */ 372 protected function _processResponse(Response $response): ResponseSet 373 { 374 $responseData = $response->getData(); 375 376 $actions = $this->getActions(); 377 378 $bulkResponses = []; 379 380 if (isset($responseData['items']) && \is_array($responseData['items'])) { 381 foreach ($responseData['items'] as $key => $item) { 382 if (!isset($actions[$key])) { 383 throw new InvalidException('No response found for action #'.$key); 384 } 385 386 $action = $actions[$key]; 387 388 $opType = \key($item); 389 $bulkResponseData = \reset($item); 390 391 if ($action instanceof AbstractDocumentAction) { 392 $data = $action->getData(); 393 if ($data instanceof Document && $data->isAutoPopulate() 394 || $this->_client->getConfigValue(['document', 'autoPopulate'], false) 395 ) { 396 if (!$data->hasId() && isset($bulkResponseData['_id'])) { 397 $data->setId($bulkResponseData['_id']); 398 } 399 if (isset($bulkResponseData['_version'])) { 400 $data->setVersion($bulkResponseData['_version']); 401 } 402 } 403 } 404 405 $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType); 406 } 407 } 408 409 $bulkResponseSet = new ResponseSet($response, $bulkResponses); 410 411 if ($bulkResponseSet->hasError()) { 412 throw new BulkResponseException($bulkResponseSet); 413 } 414 415 return $bulkResponseSet; 416 } 417} 418