1<?php 2 3namespace Elastica; 4 5use Elastica\Bulk\Action; 6use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction; 7use Elastica\Bulk\Response as BulkResponse; 8use Elastica\Bulk\ResponseSet; 9use Elastica\Exception\Bulk\ResponseException as BulkResponseException; 10use Elastica\Exception\ClientException; 11use Elastica\Exception\ConnectionException; 12use Elastica\Exception\InvalidException; 13use Elastica\Exception\RequestEntityTooLargeException; 14use Elastica\Exception\ResponseException; 15use Elastica\Script\AbstractScript; 16 17class Bulk 18{ 19 public const DELIMITER = "\n"; 20 21 /** 22 * @var Client 23 */ 24 protected $_client; 25 26 /** 27 * @var Action[] 28 */ 29 protected $_actions = []; 30 31 /** 32 * @var string|null 33 */ 34 protected $_index; 35 36 /** 37 * @var array request parameters to the bulk api 38 */ 39 protected $_requestParams = []; 40 41 public function __construct(Client $client) 42 { 43 $this->_client = $client; 44 } 45 46 public function __toString(): string 47 { 48 $data = ''; 49 50 foreach ($this->getActions() as $action) { 51 $data .= (string) $action; 52 } 53 54 return $data; 55 } 56 57 /** 58 * @param Index|string $index 59 * 60 * @return $this 61 */ 62 public function setIndex($index): self 63 { 64 if ($index instanceof Index) { 65 $index = $index->getName(); 66 } 67 68 $this->_index = (string) $index; 69 70 return $this; 71 } 72 73 /** 74 * @return string|null 75 */ 76 public function getIndex() 77 { 78 return $this->_index; 79 } 80 81 public function hasIndex(): bool 82 { 83 return null !== $this->getIndex() && '' !== $this->getIndex(); 84 } 85 86 public function getPath(): string 87 { 88 $path = ''; 89 if ($this->hasIndex()) { 90 $path .= $this->getIndex().'/'; 91 } 92 $path .= '_bulk'; 93 94 return $path; 95 } 96 97 /** 98 * @return $this 99 */ 100 public function addAction(Action $action): self 101 { 102 $this->_actions[] = $action; 103 104 return $this; 105 } 106 107 /** 108 * @param Action[] $actions 109 * 110 * @return $this 111 */ 112 public function addActions(array $actions): self 113 { 114 foreach ($actions as $action) { 115 $this->addAction($action); 116 } 117 118 return $this; 119 } 120 121 /** 122 * @return Action[] 123 */ 124 public function getActions(): array 125 { 126 return $this->_actions; 127 } 128 129 /** 130 * @return $this 131 */ 132 public function addDocument(Document $document, ?string $opType = null): self 133 { 134 $action = AbstractDocumentAction::create($document, $opType); 135 136 return $this->addAction($action); 137 } 138 139 /** 140 * @param Document[] $documents 141 * 142 * @return $this 143 */ 144 public function addDocuments(array $documents, ?string $opType = null): self 145 { 146 foreach ($documents as $document) { 147 $this->addDocument($document, $opType); 148 } 149 150 return $this; 151 } 152 153 /** 154 * @return $this 155 */ 156 public function addScript(AbstractScript $script, ?string $opType = null): self 157 { 158 $action = AbstractDocumentAction::create($script, $opType); 159 160 return $this->addAction($action); 161 } 162 163 /** 164 * @param AbstractScript[] $scripts 165 * @param string|null $opType 166 * 167 * @return $this 168 */ 169 public function addScripts(array $scripts, $opType = null): self 170 { 171 foreach ($scripts as $script) { 172 $this->addScript($script, $opType); 173 } 174 175 return $this; 176 } 177 178 /** 179 * @param AbstractScript|array|Document $data 180 * 181 * @return $this 182 */ 183 public function addData($data, ?string $opType = null) 184 { 185 if (!\is_array($data)) { 186 $data = [$data]; 187 } 188 189 foreach ($data as $actionData) { 190 if ($actionData instanceof AbstractScript) { 191 $this->addScript($actionData, $opType); 192 } elseif ($actionData instanceof Document) { 193 $this->addDocument($actionData, $opType); 194 } else { 195 throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts'); 196 } 197 } 198 199 return $this; 200 } 201 202 /** 203 * @throws InvalidException 204 * 205 * @return $this 206 */ 207 public function addRawData(array $data): self 208 { 209 foreach ($data as $row) { 210 if (\is_array($row)) { 211 $opType = \key($row); 212 $metadata = \reset($row); 213 if (Action::isValidOpType($opType)) { 214 // add previous action 215 if (isset($action)) { 216 $this->addAction($action); 217 } 218 $action = new Action($opType, $metadata); 219 } elseif (isset($action)) { 220 $action->setSource($row); 221 $this->addAction($action); 222 $action = null; 223 } else { 224 throw new InvalidException('Invalid bulk data, source must follow action metadata'); 225 } 226 } else { 227 throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action'); 228 } 229 } 230 231 // add last action if available 232 if (isset($action)) { 233 $this->addAction($action); 234 } 235 236 return $this; 237 } 238 239 /** 240 * Set a url parameter on the request bulk request. 241 * 242 * @param string $name name of the parameter 243 * @param mixed $value value of the parameter 244 * 245 * @return $this 246 */ 247 public function setRequestParam(string $name, $value): self 248 { 249 $this->_requestParams[$name] = $value; 250 251 return $this; 252 } 253 254 /** 255 * Set the amount of time that the request will wait the shards to come on line. 256 * Requires Elasticsearch version >= 0.90.8. 257 * 258 * @param string $time timeout in Elasticsearch time format 259 * 260 * @return $this 261 */ 262 public function setShardTimeout(string $time): self 263 { 264 return $this->setRequestParam('timeout', $time); 265 } 266 267 /** 268 * @deprecated since version 7.1.3, use the "__toString()" method or cast to string instead. 269 */ 270 public function toString(): string 271 { 272 \trigger_deprecation('ruflin/elastica', '7.1.3', 'The "%s()" method is deprecated, use "__toString()" or cast to string instead. It will be removed in 8.0.', __METHOD__); 273 274 return (string) $this; 275 } 276 277 public function toArray(): array 278 { 279 $data = []; 280 foreach ($this->getActions() as $action) { 281 foreach ($action->toArray() as $row) { 282 $data[] = $row; 283 } 284 } 285 286 return $data; 287 } 288 289 /** 290 * @throws ClientException 291 * @throws ConnectionException 292 * @throws ResponseException 293 * @throws BulkResponseException 294 * @throws InvalidException 295 */ 296 public function send(): ResponseSet 297 { 298 $response = $this->_client->request($this->getPath(), Request::POST, (string) $this, $this->_requestParams, Request::NDJSON_CONTENT_TYPE); 299 300 return $this->_processResponse($response); 301 } 302 303 /** 304 * @throws BulkResponseException 305 * @throws InvalidException 306 */ 307 protected function _processResponse(Response $response): ResponseSet 308 { 309 switch ($response->getStatus()) { 310 case 413: throw new RequestEntityTooLargeException(); 311 } 312 313 $responseData = $response->getData(); 314 315 $actions = $this->getActions(); 316 317 $bulkResponses = []; 318 319 if (isset($responseData['items']) && \is_array($responseData['items'])) { 320 foreach ($responseData['items'] as $key => $item) { 321 if (!isset($actions[$key])) { 322 throw new InvalidException('No response found for action #'.$key); 323 } 324 325 $action = $actions[$key]; 326 327 $opType = \key($item); 328 $bulkResponseData = \reset($item); 329 330 if ($action instanceof AbstractDocumentAction) { 331 $data = $action->getData(); 332 if ($data instanceof Document && $data->isAutoPopulate() 333 || $this->_client->getConfigValue(['document', 'autoPopulate'], false) 334 ) { 335 if (!$data->hasId() && isset($bulkResponseData['_id'])) { 336 $data->setId($bulkResponseData['_id']); 337 } 338 $data->setVersionParams($bulkResponseData); 339 } 340 } 341 342 $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType); 343 } 344 } 345 346 $bulkResponseSet = new ResponseSet($response, $bulkResponses); 347 348 if ($bulkResponseSet->hasError()) { 349 throw new BulkResponseException($bulkResponseSet); 350 } 351 352 return $bulkResponseSet; 353 } 354} 355