1 <?php
2 
3 namespace Elastica;
4 
5 use Elastica\Exception\ClientException;
6 use Elastica\Exception\ConnectionException;
7 use Elastica\Exception\InvalidException;
8 use Elastica\Exception\ResponseException;
9 use Elastica\Processor\AbstractProcessor;
10 use Elasticsearch\Endpoints\AbstractEndpoint;
11 use Elasticsearch\Endpoints\Ingest\DeletePipeline;
12 use Elasticsearch\Endpoints\Ingest\GetPipeline;
13 use Elasticsearch\Endpoints\Ingest\Pipeline\Delete;
14 use Elasticsearch\Endpoints\Ingest\Pipeline\Get;
15 use Elasticsearch\Endpoints\Ingest\Pipeline\Put;
16 use Elasticsearch\Endpoints\Ingest\PutPipeline;
17 
18 /**
19  * Elastica Pipeline object.
20  *
21  * Handles Pipeline management & definition.
22  *
23  * @author   Federico Panini <fpanini@gmail.com>
24  *
25  * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html
26  */
27 class Pipeline extends Param
28 {
29     /**
30      * @var string
31      */
32     protected $id;
33 
34     /**
35      * @var Client Client object
36      */
37     protected $_client;
38 
39     /**
40      * @var AbstractProcessor[]
41      * @phpstan-var array{processors?: AbstractProcessor[]}
42      */
43     protected $_processors = [];
44 
45     public function __construct(Client $client)
46     {
47         $this->_client = $client;
48     }
49 
50     /**
51      * Create a Pipeline.
52      *
53      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html
54      */
55     public function create(): Response
56     {
57         if (empty($this->id)) {
58             throw new InvalidException('You should set a valid pipeline id');
59         }
60 
61         if (empty($this->_params['description'])) {
62             throw new InvalidException('You should set a valid processor description.');
63         }
64 
65         if (empty($this->_processors['processors'])) {
66             throw new InvalidException('You should set a valid processor of type Elastica\Processor\AbstractProcessor.');
67         }
68 
69         // TODO: Use only PutPipeline when dropping support for elasticsearch/elasticsearch 7.x
70         $endpoint = \class_exists(PutPipeline::class) ? new PutPipeline() : new Put();
71         $endpoint->setId($this->id);
72         $endpoint->setBody($this->toArray());
73 
74         return $this->requestEndpoint($endpoint);
75     }
76 
77     /**
78      * Get a Pipeline Object.
79      *
80      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html
81      */
82     public function getPipeline(string $id): Response
83     {
84         // TODO: Use only GetPipeline when dropping support for elasticsearch/elasticsearch 7.x
85         $endpoint = \class_exists(GetPipeline::class) ? new GetPipeline() : new Get();
86         $endpoint->setId($id);
87 
88         return $this->requestEndpoint($endpoint);
89     }
90 
91     /**
92      * Delete a Pipeline.
93      *
94      * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html
95      */
96     public function deletePipeline(string $id): Response
97     {
98         // TODO: Use only DeletePipeline when dropping support for elasticsearch/elasticsearch 7.x
99         $endpoint = \class_exists(DeletePipeline::class) ? new DeletePipeline() : new Delete();
100         $endpoint->setId($id);
101 
102         return $this->requestEndpoint($endpoint);
103     }
104 
105     /**
106      * Sets query as raw array. Will overwrite all already set arguments.
107      *
108      * @param array $processors array
109      */
110     public function setRawProcessors(array $processors): self
111     {
112         $this->_processors = $processors;
113 
114         return $this;
115     }
116 
117     public function addProcessor(AbstractProcessor $processor): self
118     {
119         if (!$this->_processors) {
120             $this->_processors['processors'] = $processor->toArray();
121             $this->_params['processors'] = [];
122         } else {
123             $this->_processors['processors'] = \array_merge($this->_processors['processors'], $processor->toArray());
124         }
125 
126         return $this;
127     }
128 
129     public function setId(string $id): self
130     {
131         $this->id = $id;
132 
133         return $this;
134     }
135 
136     public function getId(): ?string
137     {
138         return $this->id;
139     }
140 
141     /**
142      * @param AbstractProcessor[] $processors
143      */
144     public function setProcessors(array $processors): self
145     {
146         $this->setParam('processors', [$processors]);
147 
148         return $this;
149     }
150 
151     public function setDescription(string $description): self
152     {
153         $this->setParam('description', $description);
154 
155         return $this;
156     }
157 
158     /**
159      * Converts the params to an array. A default implementation exist to create
160      * the an array out of the class name (last part of the class name)
161      * and the params.
162      */
163     public function toArray(): array
164     {
165         $this->_params['processors'] = [$this->_processors['processors']];
166 
167         return $this->getParams();
168     }
169 
170     public function getClient(): Client
171     {
172         return $this->_client;
173     }
174 
175     /**
176      * Makes calls to the elasticsearch server with usage official client Endpoint based on this index.
177      *
178      * @throws ClientException
179      * @throws ConnectionException
180      * @throws ResponseException
181      */
182     public function requestEndpoint(AbstractEndpoint $endpoint): Response
183     {
184         $cloned = clone $endpoint;
185 
186         return $this->getClient()->requestEndpoint($cloned);
187     }
188 }
189