1<?php
2
3namespace Sabre\Event\Loop;
4
5/**
6 * A simple eventloop implementation.
7 *
8 * This eventloop supports:
9 *   * nextTick
10 *   * setTimeout for delayed functions
11 *   * setInterval for repeating functions
12 *   * stream events using stream_select
13 *
14 * @copyright Copyright (C) 2007-2015 fruux GmbH. (https://fruux.com/)
15 * @author Evert Pot (http://evertpot.com/)
16 * @license http://sabre.io/license/ Modified BSD License
17 */
18class Loop {
19
20    /**
21     * Executes a function after x seconds.
22     *
23     * @param callable $cb
24     * @param float $timeout timeout in seconds
25     * @return void
26     */
27    function setTimeout(callable $cb, $timeout) {
28
29        $triggerTime = microtime(true) + ($timeout);
30
31        if (!$this->timers) {
32            // Special case when the timers array was empty.
33            $this->timers[] = [$triggerTime, $cb];
34            return;
35        }
36
37        // We need to insert these values in the timers array, but the timers
38        // array must be in reverse-order of trigger times.
39        //
40        // So here we search the array for the insertion point.
41        $index = count($this->timers) - 1;
42        while (true) {
43            if ($triggerTime < $this->timers[$index][0]) {
44                array_splice(
45                    $this->timers,
46                    $index + 1,
47                    0,
48                    [[$triggerTime, $cb]]
49                );
50                break;
51            } elseif ($index === 0) {
52                array_unshift($this->timers, [$triggerTime, $cb]);
53                break;
54            }
55            $index--;
56
57        }
58
59    }
60
61    /**
62     * Executes a function every x seconds.
63     *
64     * The value this function returns can be used to stop the interval with
65     * clearInterval.
66     *
67     * @param callable $cb
68     * @param float $timeout
69     * @return array
70     */
71    function setInterval(callable $cb, $timeout) {
72
73        $keepGoing = true;
74        $f = null;
75
76        $f = function() use ($cb, &$f, $timeout, &$keepGoing) {
77            if ($keepGoing) {
78                $cb();
79                $this->setTimeout($f, $timeout);
80            }
81        };
82        $this->setTimeout($f, $timeout);
83
84        // Really the only thing that matters is returning the $keepGoing
85        // boolean value.
86        //
87        // We need to pack it in an array to allow returning by reference.
88        // Because I'm worried people will be confused by using a boolean as a
89        // sort of identifier, I added an extra string.
90        return ['I\'m an implementation detail', &$keepGoing];
91
92    }
93
94    /**
95     * Stops a running internval.
96     *
97     * @param array $intervalId
98     * @return void
99     */
100    function clearInterval($intervalId) {
101
102        $intervalId[1] = false;
103
104    }
105
106    /**
107     * Runs a function immediately at the next iteration of the loop.
108     *
109     * @param callable $cb
110     * @return void
111     */
112    function nextTick(callable $cb) {
113
114        $this->nextTick[] = $cb;
115
116    }
117
118
119    /**
120     * Adds a read stream.
121     *
122     * The callback will be called as soon as there is something to read from
123     * the stream.
124     *
125     * You MUST call removeReadStream after you are done with the stream, to
126     * prevent the eventloop from never stopping.
127     *
128     * @param resource $stream
129     * @param callable $cb
130     * @return void
131     */
132    function addReadStream($stream, callable $cb) {
133
134        $this->readStreams[(int)$stream] = $stream;
135        $this->readCallbacks[(int)$stream] = $cb;
136
137    }
138
139    /**
140     * Adds a write stream.
141     *
142     * The callback will be called as soon as the system reports it's ready to
143     * receive writes on the stream.
144     *
145     * You MUST call removeWriteStream after you are done with the stream, to
146     * prevent the eventloop from never stopping.
147     *
148     * @param resource $stream
149     * @param callable $cb
150     * @return void
151     */
152    function addWriteStream($stream, callable $cb) {
153
154        $this->writeStreams[(int)$stream] = $stream;
155        $this->writeCallbacks[(int)$stream] = $cb;
156
157    }
158
159    /**
160     * Stop watching a stream for reads.
161     *
162     * @param resource $stream
163     * @return void
164     */
165    function removeReadStream($stream) {
166
167        unset(
168            $this->readStreams[(int)$stream],
169            $this->readCallbacks[(int)$stream]
170        );
171
172    }
173
174    /**
175     * Stop watching a stream for writes.
176     *
177     * @param resource $stream
178     * @return void
179     */
180    function removeWriteStream($stream) {
181
182        unset(
183            $this->writeStreams[(int)$stream],
184            $this->writeCallbacks[(int)$stream]
185        );
186
187    }
188
189
190    /**
191     * Runs the loop.
192     *
193     * This function will run continiously, until there's no more events to
194     * handle.
195     *
196     * @return void
197     */
198    function run() {
199
200        $this->running = true;
201
202        do {
203
204            $hasEvents = $this->tick(true);
205
206        } while ($this->running && $hasEvents);
207        $this->running = false;
208
209    }
210
211    /**
212     * Executes all pending events.
213     *
214     * If $block is turned true, this function will block until any event is
215     * triggered.
216     *
217     * If there are now timeouts, nextTick callbacks or events in the loop at
218     * all, this function will exit immediately.
219     *
220     * This function will return true if there are _any_ events left in the
221     * loop after the tick.
222     *
223     * @param bool $block
224     * @return bool
225     */
226    function tick($block = false) {
227
228        $this->runNextTicks();
229        $nextTimeout = $this->runTimers();
230
231        // Calculating how long runStreams should at most wait.
232        if (!$block) {
233            // Don't wait
234            $streamWait = 0;
235        } elseif ($this->nextTick) {
236            // There's a pending 'nextTick'. Don't wait.
237            $streamWait = 0;
238        } elseif (is_numeric($nextTimeout)) {
239            // Wait until the next Timeout should trigger.
240            $streamWait = $nextTimeout;
241        } else {
242            // Wait indefinitely
243            $streamWait = null;
244        }
245
246        $this->runStreams($streamWait);
247
248        return ($this->readStreams || $this->writeStreams || $this->nextTick || $this->timers);
249
250    }
251
252    /**
253     * Stops a running eventloop
254     *
255     * @return void
256     */
257    function stop() {
258
259        $this->running = false;
260
261    }
262
263    /**
264     * Executes all 'nextTick' callbacks.
265     *
266     * return void
267     */
268    protected function runNextTicks() {
269
270        $nextTick = $this->nextTick;
271        $this->nextTick = [];
272
273        foreach ($nextTick as $cb) {
274            $cb();
275        }
276
277    }
278
279    /**
280     * Runs all pending timers.
281     *
282     * After running the timer callbacks, this function returns the number of
283     * seconds until the next timer should be executed.
284     *
285     * If there's no more pending timers, this function returns null.
286     *
287     * @return float
288     */
289    protected function runTimers() {
290
291        $now = microtime(true);
292        while (($timer = array_pop($this->timers)) && $timer[0] < $now) {
293            $timer[1]();
294        }
295        // Add the last timer back to the array.
296        if ($timer) {
297            $this->timers[] = $timer;
298            return $timer[0] - microtime(true);
299        }
300
301    }
302
303    /**
304     * Runs all pending stream events.
305     *
306     * @param float $timeout
307     */
308    protected function runStreams($timeout) {
309
310        if ($this->readStreams || $this->writeStreams) {
311
312            $read = $this->readStreams;
313            $write = $this->writeStreams;
314            $except = null;
315            if (stream_select($read, $write, $except, null, $timeout)) {
316
317                // See PHP Bug https://bugs.php.net/bug.php?id=62452
318                // Fixed in PHP7
319                foreach ($read as $readStream) {
320                    $readCb = $this->readCallbacks[(int)$readStream];
321                    $readCb();
322                }
323                foreach ($write as $writeStream) {
324                    $writeCb = $this->writeCallbacks[(int)$writeStream];
325                    $writeCb();
326                }
327
328            }
329
330        } elseif ($this->running && ($this->nextTick || $this->timers)) {
331            usleep($timeout !== null ? $timeout * 1000000 : 200000);
332        }
333
334    }
335
336    /**
337     * Is the main loop active
338     *
339     * @var bool
340     */
341    protected $running = false;
342
343    /**
344     * A list of timers, added by setTimeout.
345     *
346     * @var array
347     */
348    protected $timers = [];
349
350    /**
351     * A list of 'nextTick' callbacks.
352     *
353     * @var callable[]
354     */
355    protected $nextTick = [];
356
357    /**
358     * List of readable streams for stream_select, indexed by stream id.
359     *
360     * @var resource[]
361     */
362    protected $readStreams = [];
363
364    /**
365     * List of writable streams for stream_select, indexed by stream id.
366     *
367     * @var resource[]
368     */
369    protected $writeStreams = [];
370
371    /**
372     * List of read callbacks, indexed by stream id.
373     *
374     * @var callback[]
375     */
376    protected $readCallbacks = [];
377
378    /**
379     * List of write callbacks, indexed by stream id.
380     *
381     * @var callback[]
382     */
383    protected $writeCallbacks = [];
384
385
386}
387