1<?php 2 3namespace Sabre\Event\Loop; 4 5/** 6 * A simple eventloop implementation. 7 * 8 * This eventloop supports: 9 * * nextTick 10 * * setTimeout for delayed functions 11 * * setInterval for repeating functions 12 * * stream events using stream_select 13 * 14 * @copyright Copyright (C) 2007-2015 fruux GmbH. (https://fruux.com/) 15 * @author Evert Pot (http://evertpot.com/) 16 * @license http://sabre.io/license/ Modified BSD License 17 */ 18class Loop { 19 20 /** 21 * Executes a function after x seconds. 22 * 23 * @param callable $cb 24 * @param float $timeout timeout in seconds 25 * @return void 26 */ 27 function setTimeout(callable $cb, $timeout) { 28 29 $triggerTime = microtime(true) + ($timeout); 30 31 if (!$this->timers) { 32 // Special case when the timers array was empty. 33 $this->timers[] = [$triggerTime, $cb]; 34 return; 35 } 36 37 // We need to insert these values in the timers array, but the timers 38 // array must be in reverse-order of trigger times. 39 // 40 // So here we search the array for the insertion point. 41 $index = count($this->timers) - 1; 42 while (true) { 43 if ($triggerTime < $this->timers[$index][0]) { 44 array_splice( 45 $this->timers, 46 $index + 1, 47 0, 48 [[$triggerTime, $cb]] 49 ); 50 break; 51 } elseif ($index === 0) { 52 array_unshift($this->timers, [$triggerTime, $cb]); 53 break; 54 } 55 $index--; 56 57 } 58 59 } 60 61 /** 62 * Executes a function every x seconds. 63 * 64 * The value this function returns can be used to stop the interval with 65 * clearInterval. 66 * 67 * @param callable $cb 68 * @param float $timeout 69 * @return array 70 */ 71 function setInterval(callable $cb, $timeout) { 72 73 $keepGoing = true; 74 $f = null; 75 76 $f = function() use ($cb, &$f, $timeout, &$keepGoing) { 77 if ($keepGoing) { 78 $cb(); 79 $this->setTimeout($f, $timeout); 80 } 81 }; 82 $this->setTimeout($f, $timeout); 83 84 // Really the only thing that matters is returning the $keepGoing 85 // boolean value. 86 // 87 // We need to pack it in an array to allow returning by reference. 88 // Because I'm worried people will be confused by using a boolean as a 89 // sort of identifier, I added an extra string. 90 return ['I\'m an implementation detail', &$keepGoing]; 91 92 } 93 94 /** 95 * Stops a running internval. 96 * 97 * @param array $intervalId 98 * @return void 99 */ 100 function clearInterval($intervalId) { 101 102 $intervalId[1] = false; 103 104 } 105 106 /** 107 * Runs a function immediately at the next iteration of the loop. 108 * 109 * @param callable $cb 110 * @return void 111 */ 112 function nextTick(callable $cb) { 113 114 $this->nextTick[] = $cb; 115 116 } 117 118 119 /** 120 * Adds a read stream. 121 * 122 * The callback will be called as soon as there is something to read from 123 * the stream. 124 * 125 * You MUST call removeReadStream after you are done with the stream, to 126 * prevent the eventloop from never stopping. 127 * 128 * @param resource $stream 129 * @param callable $cb 130 * @return void 131 */ 132 function addReadStream($stream, callable $cb) { 133 134 $this->readStreams[(int)$stream] = $stream; 135 $this->readCallbacks[(int)$stream] = $cb; 136 137 } 138 139 /** 140 * Adds a write stream. 141 * 142 * The callback will be called as soon as the system reports it's ready to 143 * receive writes on the stream. 144 * 145 * You MUST call removeWriteStream after you are done with the stream, to 146 * prevent the eventloop from never stopping. 147 * 148 * @param resource $stream 149 * @param callable $cb 150 * @return void 151 */ 152 function addWriteStream($stream, callable $cb) { 153 154 $this->writeStreams[(int)$stream] = $stream; 155 $this->writeCallbacks[(int)$stream] = $cb; 156 157 } 158 159 /** 160 * Stop watching a stream for reads. 161 * 162 * @param resource $stream 163 * @return void 164 */ 165 function removeReadStream($stream) { 166 167 unset( 168 $this->readStreams[(int)$stream], 169 $this->readCallbacks[(int)$stream] 170 ); 171 172 } 173 174 /** 175 * Stop watching a stream for writes. 176 * 177 * @param resource $stream 178 * @return void 179 */ 180 function removeWriteStream($stream) { 181 182 unset( 183 $this->writeStreams[(int)$stream], 184 $this->writeCallbacks[(int)$stream] 185 ); 186 187 } 188 189 190 /** 191 * Runs the loop. 192 * 193 * This function will run continiously, until there's no more events to 194 * handle. 195 * 196 * @return void 197 */ 198 function run() { 199 200 $this->running = true; 201 202 do { 203 204 $hasEvents = $this->tick(true); 205 206 } while ($this->running && $hasEvents); 207 $this->running = false; 208 209 } 210 211 /** 212 * Executes all pending events. 213 * 214 * If $block is turned true, this function will block until any event is 215 * triggered. 216 * 217 * If there are now timeouts, nextTick callbacks or events in the loop at 218 * all, this function will exit immediately. 219 * 220 * This function will return true if there are _any_ events left in the 221 * loop after the tick. 222 * 223 * @param bool $block 224 * @return bool 225 */ 226 function tick($block = false) { 227 228 $this->runNextTicks(); 229 $nextTimeout = $this->runTimers(); 230 231 // Calculating how long runStreams should at most wait. 232 if (!$block) { 233 // Don't wait 234 $streamWait = 0; 235 } elseif ($this->nextTick) { 236 // There's a pending 'nextTick'. Don't wait. 237 $streamWait = 0; 238 } elseif (is_numeric($nextTimeout)) { 239 // Wait until the next Timeout should trigger. 240 $streamWait = $nextTimeout; 241 } else { 242 // Wait indefinitely 243 $streamWait = null; 244 } 245 246 $this->runStreams($streamWait); 247 248 return ($this->readStreams || $this->writeStreams || $this->nextTick || $this->timers); 249 250 } 251 252 /** 253 * Stops a running eventloop 254 * 255 * @return void 256 */ 257 function stop() { 258 259 $this->running = false; 260 261 } 262 263 /** 264 * Executes all 'nextTick' callbacks. 265 * 266 * return void 267 */ 268 protected function runNextTicks() { 269 270 $nextTick = $this->nextTick; 271 $this->nextTick = []; 272 273 foreach ($nextTick as $cb) { 274 $cb(); 275 } 276 277 } 278 279 /** 280 * Runs all pending timers. 281 * 282 * After running the timer callbacks, this function returns the number of 283 * seconds until the next timer should be executed. 284 * 285 * If there's no more pending timers, this function returns null. 286 * 287 * @return float 288 */ 289 protected function runTimers() { 290 291 $now = microtime(true); 292 while (($timer = array_pop($this->timers)) && $timer[0] < $now) { 293 $timer[1](); 294 } 295 // Add the last timer back to the array. 296 if ($timer) { 297 $this->timers[] = $timer; 298 return $timer[0] - microtime(true); 299 } 300 301 } 302 303 /** 304 * Runs all pending stream events. 305 * 306 * @param float $timeout 307 */ 308 protected function runStreams($timeout) { 309 310 if ($this->readStreams || $this->writeStreams) { 311 312 $read = $this->readStreams; 313 $write = $this->writeStreams; 314 $except = null; 315 if (stream_select($read, $write, $except, null, $timeout)) { 316 317 // See PHP Bug https://bugs.php.net/bug.php?id=62452 318 // Fixed in PHP7 319 foreach ($read as $readStream) { 320 $readCb = $this->readCallbacks[(int)$readStream]; 321 $readCb(); 322 } 323 foreach ($write as $writeStream) { 324 $writeCb = $this->writeCallbacks[(int)$writeStream]; 325 $writeCb(); 326 } 327 328 } 329 330 } elseif ($this->running && ($this->nextTick || $this->timers)) { 331 usleep($timeout !== null ? $timeout * 1000000 : 200000); 332 } 333 334 } 335 336 /** 337 * Is the main loop active 338 * 339 * @var bool 340 */ 341 protected $running = false; 342 343 /** 344 * A list of timers, added by setTimeout. 345 * 346 * @var array 347 */ 348 protected $timers = []; 349 350 /** 351 * A list of 'nextTick' callbacks. 352 * 353 * @var callable[] 354 */ 355 protected $nextTick = []; 356 357 /** 358 * List of readable streams for stream_select, indexed by stream id. 359 * 360 * @var resource[] 361 */ 362 protected $readStreams = []; 363 364 /** 365 * List of writable streams for stream_select, indexed by stream id. 366 * 367 * @var resource[] 368 */ 369 protected $writeStreams = []; 370 371 /** 372 * List of read callbacks, indexed by stream id. 373 * 374 * @var callback[] 375 */ 376 protected $readCallbacks = []; 377 378 /** 379 * List of write callbacks, indexed by stream id. 380 * 381 * @var callback[] 382 */ 383 protected $writeCallbacks = []; 384 385 386} 387