1<?php
2
3namespace Elastica;
4
5use Elastica\Exception\ClientException;
6use Elastica\Exception\ConnectionException;
7use Elastica\Exception\InvalidException;
8use Elastica\Exception\ResponseException;
9use Elastica\Processor\AbstractProcessor;
10use Elasticsearch\Endpoints\AbstractEndpoint;
11use Elasticsearch\Endpoints\Ingest\DeletePipeline;
12use Elasticsearch\Endpoints\Ingest\GetPipeline;
13use Elasticsearch\Endpoints\Ingest\Pipeline\Delete;
14use Elasticsearch\Endpoints\Ingest\Pipeline\Get;
15use Elasticsearch\Endpoints\Ingest\Pipeline\Put;
16use Elasticsearch\Endpoints\Ingest\PutPipeline;
17
18/**
19 * Elastica Pipeline object.
20 *
21 * Handles Pipeline management & definition.
22 *
23 * @author   Federico Panini <fpanini@gmail.com>
24 *
25 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html
26 */
27class Pipeline extends Param
28{
29    /**
30     * @var string
31     */
32    protected $id;
33
34    /**
35     * @var Client Client object
36     */
37    protected $_client;
38
39    /**
40     * @var AbstractProcessor[]
41     * @phpstan-var array{processors?: AbstractProcessor[]}
42     */
43    protected $_processors = [];
44
45    public function __construct(Client $client)
46    {
47        $this->_client = $client;
48    }
49
50    /**
51     * Create a Pipeline.
52     *
53     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html
54     */
55    public function create(): Response
56    {
57        if (empty($this->id)) {
58            throw new InvalidException('You should set a valid pipeline id');
59        }
60
61        if (empty($this->_params['description'])) {
62            throw new InvalidException('You should set a valid processor description.');
63        }
64
65        if (empty($this->_processors['processors'])) {
66            throw new InvalidException('You should set a valid processor of type Elastica\Processor\AbstractProcessor.');
67        }
68
69        // TODO: Use only PutPipeline when dropping support for elasticsearch/elasticsearch 7.x
70        $endpoint = \class_exists(PutPipeline::class) ? new PutPipeline() : new Put();
71        $endpoint->setId($this->id);
72        $endpoint->setBody($this->toArray());
73
74        return $this->requestEndpoint($endpoint);
75    }
76
77    /**
78     * Get a Pipeline Object.
79     *
80     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html
81     */
82    public function getPipeline(string $id): Response
83    {
84        // TODO: Use only GetPipeline when dropping support for elasticsearch/elasticsearch 7.x
85        $endpoint = \class_exists(GetPipeline::class) ? new GetPipeline() : new Get();
86        $endpoint->setId($id);
87
88        return $this->requestEndpoint($endpoint);
89    }
90
91    /**
92     * Delete a Pipeline.
93     *
94     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html
95     */
96    public function deletePipeline(string $id): Response
97    {
98        // TODO: Use only DeletePipeline when dropping support for elasticsearch/elasticsearch 7.x
99        $endpoint = \class_exists(DeletePipeline::class) ? new DeletePipeline() : new Delete();
100        $endpoint->setId($id);
101
102        return $this->requestEndpoint($endpoint);
103    }
104
105    /**
106     * Sets query as raw array. Will overwrite all already set arguments.
107     *
108     * @param array $processors array
109     */
110    public function setRawProcessors(array $processors): self
111    {
112        $this->_processors = $processors;
113
114        return $this;
115    }
116
117    public function addProcessor(AbstractProcessor $processor): self
118    {
119        if (!$this->_processors) {
120            $this->_processors['processors'] = $processor->toArray();
121            $this->_params['processors'] = [];
122        } else {
123            $this->_processors['processors'] = \array_merge($this->_processors['processors'], $processor->toArray());
124        }
125
126        return $this;
127    }
128
129    public function setId(string $id): self
130    {
131        $this->id = $id;
132
133        return $this;
134    }
135
136    public function getId(): ?string
137    {
138        return $this->id;
139    }
140
141    /**
142     * @param AbstractProcessor[] $processors
143     */
144    public function setProcessors(array $processors): self
145    {
146        $this->setParam('processors', [$processors]);
147
148        return $this;
149    }
150
151    public function setDescription(string $description): self
152    {
153        $this->setParam('description', $description);
154
155        return $this;
156    }
157
158    /**
159     * Converts the params to an array. A default implementation exist to create
160     * the an array out of the class name (last part of the class name)
161     * and the params.
162     */
163    public function toArray(): array
164    {
165        $this->_params['processors'] = [$this->_processors['processors']];
166
167        return $this->getParams();
168    }
169
170    public function getClient(): Client
171    {
172        return $this->_client;
173    }
174
175    /**
176     * Makes calls to the elasticsearch server with usage official client Endpoint based on this index.
177     *
178     * @throws ClientException
179     * @throws ConnectionException
180     * @throws ResponseException
181     */
182    public function requestEndpoint(AbstractEndpoint $endpoint): Response
183    {
184        $cloned = clone $endpoint;
185
186        return $this->getClient()->requestEndpoint($cloned);
187    }
188}
189