1<?php 2 3namespace Elastica; 4 5use Elastica\Exception\ClientException; 6use Elastica\Exception\ConnectionException; 7use Elastica\Exception\InvalidException; 8use Elastica\Exception\ResponseException; 9use Elastica\Processor\AbstractProcessor; 10use Elasticsearch\Endpoints\AbstractEndpoint; 11use Elasticsearch\Endpoints\Ingest\DeletePipeline; 12use Elasticsearch\Endpoints\Ingest\GetPipeline; 13use Elasticsearch\Endpoints\Ingest\Pipeline\Delete; 14use Elasticsearch\Endpoints\Ingest\Pipeline\Get; 15use Elasticsearch\Endpoints\Ingest\Pipeline\Put; 16use Elasticsearch\Endpoints\Ingest\PutPipeline; 17 18/** 19 * Elastica Pipeline object. 20 * 21 * Handles Pipeline management & definition. 22 * 23 * @author Federico Panini <fpanini@gmail.com> 24 * 25 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html 26 */ 27class Pipeline extends Param 28{ 29 /** 30 * @var string 31 */ 32 protected $id; 33 34 /** 35 * @var Client Client object 36 */ 37 protected $_client; 38 39 /** 40 * @var AbstractProcessor[] 41 * @phpstan-var array{processors?: AbstractProcessor[]} 42 */ 43 protected $_processors = []; 44 45 public function __construct(Client $client) 46 { 47 $this->_client = $client; 48 } 49 50 /** 51 * Create a Pipeline. 52 * 53 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html 54 */ 55 public function create(): Response 56 { 57 if (empty($this->id)) { 58 throw new InvalidException('You should set a valid pipeline id'); 59 } 60 61 if (empty($this->_params['description'])) { 62 throw new InvalidException('You should set a valid processor description.'); 63 } 64 65 if (empty($this->_processors['processors'])) { 66 throw new InvalidException('You should set a valid processor of type Elastica\Processor\AbstractProcessor.'); 67 } 68 69 // TODO: Use only PutPipeline when dropping support for elasticsearch/elasticsearch 7.x 70 $endpoint = \class_exists(PutPipeline::class) ? new PutPipeline() : new Put(); 71 $endpoint->setId($this->id); 72 $endpoint->setBody($this->toArray()); 73 74 return $this->requestEndpoint($endpoint); 75 } 76 77 /** 78 * Get a Pipeline Object. 79 * 80 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html 81 */ 82 public function getPipeline(string $id): Response 83 { 84 // TODO: Use only GetPipeline when dropping support for elasticsearch/elasticsearch 7.x 85 $endpoint = \class_exists(GetPipeline::class) ? new GetPipeline() : new Get(); 86 $endpoint->setId($id); 87 88 return $this->requestEndpoint($endpoint); 89 } 90 91 /** 92 * Delete a Pipeline. 93 * 94 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html 95 */ 96 public function deletePipeline(string $id): Response 97 { 98 // TODO: Use only DeletePipeline when dropping support for elasticsearch/elasticsearch 7.x 99 $endpoint = \class_exists(DeletePipeline::class) ? new DeletePipeline() : new Delete(); 100 $endpoint->setId($id); 101 102 return $this->requestEndpoint($endpoint); 103 } 104 105 /** 106 * Sets query as raw array. Will overwrite all already set arguments. 107 * 108 * @param array $processors array 109 */ 110 public function setRawProcessors(array $processors): self 111 { 112 $this->_processors = $processors; 113 114 return $this; 115 } 116 117 public function addProcessor(AbstractProcessor $processor): self 118 { 119 if (!$this->_processors) { 120 $this->_processors['processors'] = $processor->toArray(); 121 $this->_params['processors'] = []; 122 } else { 123 $this->_processors['processors'] = \array_merge($this->_processors['processors'], $processor->toArray()); 124 } 125 126 return $this; 127 } 128 129 public function setId(string $id): self 130 { 131 $this->id = $id; 132 133 return $this; 134 } 135 136 public function getId(): ?string 137 { 138 return $this->id; 139 } 140 141 /** 142 * @param AbstractProcessor[] $processors 143 */ 144 public function setProcessors(array $processors): self 145 { 146 $this->setParam('processors', [$processors]); 147 148 return $this; 149 } 150 151 public function setDescription(string $description): self 152 { 153 $this->setParam('description', $description); 154 155 return $this; 156 } 157 158 /** 159 * Converts the params to an array. A default implementation exist to create 160 * the an array out of the class name (last part of the class name) 161 * and the params. 162 */ 163 public function toArray(): array 164 { 165 $this->_params['processors'] = [$this->_processors['processors']]; 166 167 return $this->getParams(); 168 } 169 170 public function getClient(): Client 171 { 172 return $this->_client; 173 } 174 175 /** 176 * Makes calls to the elasticsearch server with usage official client Endpoint based on this index. 177 * 178 * @throws ClientException 179 * @throws ConnectionException 180 * @throws ResponseException 181 */ 182 public function requestEndpoint(AbstractEndpoint $endpoint): Response 183 { 184 $cloned = clone $endpoint; 185 186 return $this->getClient()->requestEndpoint($cloned); 187 } 188} 189