1<?php
2
3namespace Elastica;
4
5use Elastica\Exception\InvalidException;
6use Elastica\Exception\NotImplementedException;
7use Elastica\Processor\AbstractProcessor;
8use Elasticsearch\Endpoints\AbstractEndpoint;
9use Elasticsearch\Endpoints\Ingest\Pipeline\Delete;
10use Elasticsearch\Endpoints\Ingest\Pipeline\Get;
11use Elasticsearch\Endpoints\Ingest\Pipeline\Put;
12
13/**
14 * Elastica Pipeline object.
15 *
16 * Handles Pipeline management & definition.
17 *
18 * @author   Federico Panini <fpanini@gmail.com>
19 *
20 * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html
21 */
22class Pipeline extends Param
23{
24    /**
25     * @var string name of the pipeline
26     */
27    protected $id;
28
29    /**
30     * Client Object.
31     *
32     * @var Client Client object
33     */
34    protected $_client;
35
36    /**
37     * Processors array.
38     *
39     * @var array
40     */
41    protected $_processors = [];
42
43    /**
44     * Create a new Pipeline Object.
45     *
46     * @param Client $client
47     */
48    public function __construct(Client $client)
49    {
50        $this->_client = $client;
51    }
52
53    /**
54     * Create a Pipeline.
55     *
56     * @return Response
57     *
58     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html
59     */
60    public function create()
61    {
62        if (empty($this->id)) {
63            throw new InvalidException('You should set a valid pipeline id');
64        }
65
66        if (empty($this->_params['description'])) {
67            throw new InvalidException('You should set a valid processor description.');
68        }
69
70        if (empty($this->_processors['processors'])) {
71            throw new InvalidException('You should set a valid processor of type Elastica\Processor\AbstractProcessor.');
72        }
73
74        $endpoint = new Put();
75        $endpoint->setID($this->id);
76        $endpoint->setBody($this->toArray());
77
78        return $this->requestEndpoint($endpoint);
79    }
80
81    /**
82     * Get a Pipeline Object.
83     *
84     * @param string $id Pipeline name
85     *
86     * @return Response
87     *
88     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html
89     */
90    public function getPipeline(string $id)
91    {
92        $endpoint = new Get();
93        $endpoint->setID($id);
94
95        return $this->requestEndpoint($endpoint);
96    }
97
98    /**
99     * Delete a Pipeline.
100     *
101     * @param string $id Pipeline name
102     *
103     * @return Response
104     *
105     * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html
106     */
107    public function deletePipeline(string $id)
108    {
109        $endpoint = new Delete();
110        $endpoint->setID($id);
111
112        return $this->requestEndpoint($endpoint);
113    }
114
115    /**
116     * @todo implement simulate API
117     */
118    public function simulate()
119    {
120        throw new NotImplementedException('simulate API on Pipeline not yet implemented.');
121    }
122
123    /**
124     * Sets query as raw array. Will overwrite all already set arguments.
125     *
126     * @param array $processors array
127     *
128     * @return $this
129     */
130    public function setRawProcessors(array $processors)
131    {
132        $this->_processors = $processors;
133
134        return $this;
135    }
136
137    /**
138     * Add a processor.
139     *
140     * @param AbstractProcessor $processor
141     *
142     * @return $this
143     */
144    public function addProcessor(AbstractProcessor $processor)
145    {
146        if (empty($this->_processors)) {
147            $this->_processors['processors'] = $processor->toArray();
148            $this->_params['processors'] = [];
149        } else {
150            $this->_processors['processors'] = \array_merge($this->_processors['processors'], $processor->toArray());
151        }
152
153        return $this;
154    }
155
156    /**
157     * Set pipeline id.
158     *
159     * @param string $id
160     */
161    public function setId(string $id)
162    {
163        $this->id = $id;
164    }
165
166    /**
167     * Sets the processors.
168     *
169     * @param array $processors array of AbstractProcessor object
170     *
171     * @return $this
172     */
173    public function setProcessors(array $processors)
174    {
175        return $this->setParam('processors', [$processors]);
176    }
177
178    /**
179     * Set Description.
180     *
181     * @param string $description
182     *
183     * @return $this
184     */
185    public function setDescription(string $description)
186    {
187        return $this->setParam('description', $description);
188    }
189
190    /**
191     * Converts the params to an array. A default implementation exist to create
192     * the an array out of the class name (last part of the class name)
193     * and the params.
194     *
195     * @return array Filter array
196     */
197    public function toArray()
198    {
199        $this->_params['processors'] = [$this->_processors['processors']];
200
201        return $this->getParams();
202    }
203
204    /**
205     * Returns index client.
206     *
207     * @return \Elastica\Client Index client object
208     */
209    public function getClient()
210    {
211        return $this->_client;
212    }
213
214    /**
215     * Makes calls to the elasticsearch server with usage official client Endpoint based on this index.
216     *
217     * @param AbstractEndpoint $endpoint
218     *
219     * @return Response
220     */
221    public function requestEndpoint(AbstractEndpoint $endpoint)
222    {
223        $cloned = clone $endpoint;
224
225        return $this->getClient()->requestEndpoint($cloned);
226    }
227}
228