1<?php 2 3/** 4 * Hoa 5 * 6 * 7 * @license 8 * 9 * New BSD License 10 * 11 * Copyright © 2007-2017, Hoa community. All rights reserved. 12 * 13 * Redistribution and use in source and binary forms, with or without 14 * modification, are permitted provided that the following conditions are met: 15 * * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * * Redistributions in binary form must reproduce the above copyright 18 * notice, this list of conditions and the following disclaimer in the 19 * documentation and/or other materials provided with the distribution. 20 * * Neither the name of the Hoa nor the names of its contributors may be 21 * used to endorse or promote products derived from this software without 22 * specific prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37namespace Hoa\Stream; 38 39use Hoa\Consistency; 40use Hoa\Event; 41use Hoa\Protocol; 42 43/** 44 * Class \Hoa\Stream. 45 * 46 * Static register for all streams (files, sockets etc.). 47 * 48 * @copyright Copyright © 2007-2017 Hoa community 49 * @license New BSD License 50 */ 51abstract class Stream implements IStream\Stream, Event\Listenable 52{ 53 use Event\Listens; 54 55 /** 56 * Name index in the stream bucket. 57 * 58 * @const int 59 */ 60 const NAME = 0; 61 62 /** 63 * Handler index in the stream bucket. 64 * 65 * @const int 66 */ 67 const HANDLER = 1; 68 69 /** 70 * Resource index in the stream bucket. 71 * 72 * @const int 73 */ 74 const RESOURCE = 2; 75 76 /** 77 * Context index in the stream bucket. 78 * 79 * @const int 80 */ 81 const CONTEXT = 3; 82 83 /** 84 * Default buffer size. 85 * 86 * @const int 87 */ 88 const DEFAULT_BUFFER_SIZE = 8192; 89 90 /** 91 * Current stream bucket. 92 * 93 * @var array 94 */ 95 protected $_bucket = []; 96 97 /** 98 * Static stream register. 99 * 100 * @var array 101 */ 102 private static $_register = []; 103 104 /** 105 * Buffer size (default is 8Ko). 106 * 107 * @var bool 108 */ 109 protected $_bufferSize = self::DEFAULT_BUFFER_SIZE; 110 111 /** 112 * Original stream name, given to the stream constructor. 113 * 114 * @var string 115 */ 116 protected $_streamName = null; 117 118 /** 119 * Context name. 120 * 121 * @var string 122 */ 123 protected $_context = null; 124 125 /** 126 * Whether the opening has been deferred. 127 * 128 * @var bool 129 */ 130 protected $_hasBeenDeferred = false; 131 132 /** 133 * Whether this stream is already opened by another handler. 134 * 135 * @var bool 136 */ 137 protected $_borrowing = false; 138 139 140 141 /** 142 * Set the current stream. 143 * If not exists in the register, try to call the 144 * `$this->_open()` method. Please, see the `self::_getStream()` method. 145 * 146 * @param string $streamName Stream name (e.g. path or URL). 147 * @param string $context Context ID (please, see the 148 * `Hoa\Stream\Context` class). 149 * @param bool $wait Differ opening or not. 150 */ 151 public function __construct($streamName, $context = null, $wait = false) 152 { 153 $this->_streamName = $streamName; 154 $this->_context = $context; 155 $this->_hasBeenDeferred = $wait; 156 $this->setListener( 157 new Event\Listener( 158 $this, 159 [ 160 'authrequire', 161 'authresult', 162 'complete', 163 'connect', 164 'failure', 165 'mimetype', 166 'progress', 167 'redirect', 168 'resolve', 169 'size' 170 ] 171 ) 172 ); 173 174 if (true === $wait) { 175 return; 176 } 177 178 $this->open(); 179 180 return; 181 } 182 183 /** 184 * Get a stream in the register. 185 * If the stream does not exist, try to open it by calling the 186 * $handler->_open() method. 187 * 188 * @param string $streamName Stream name. 189 * @param \Hoa\Stream $handler Stream handler. 190 * @param string $context Context ID (please, see the 191 * \Hoa\Stream\Context class). 192 * @return array 193 * @throws \Hoa\Stream\Exception 194 */ 195 final private static function &_getStream( 196 $streamName, 197 Stream $handler, 198 $context = null 199 ) { 200 $name = md5($streamName); 201 202 if (null !== $context) { 203 if (false === Context::contextExists($context)) { 204 throw new Exception( 205 'Context %s was not previously declared, cannot retrieve ' . 206 'this context.', 207 0, 208 $context 209 ); 210 } 211 212 $context = Context::getInstance($context); 213 } 214 215 if (!isset(self::$_register[$name])) { 216 self::$_register[$name] = [ 217 self::NAME => $streamName, 218 self::HANDLER => $handler, 219 self::RESOURCE => $handler->_open($streamName, $context), 220 self::CONTEXT => $context 221 ]; 222 Event::register( 223 'hoa://Event/Stream/' . $streamName, 224 $handler 225 ); 226 // Add :open-ready? 227 Event::register( 228 'hoa://Event/Stream/' . $streamName . ':close-before', 229 $handler 230 ); 231 } else { 232 $handler->_borrowing = true; 233 } 234 235 if (null === self::$_register[$name][self::RESOURCE]) { 236 self::$_register[$name][self::RESOURCE] 237 = $handler->_open($streamName, $context); 238 } 239 240 return self::$_register[$name]; 241 } 242 243 /** 244 * Open the stream and return the associated resource. 245 * Note: This method is protected, but do not forget that it could be 246 * overloaded into a public context. 247 * 248 * @param string $streamName Stream name (e.g. path or URL). 249 * @param \Hoa\Stream\Context $context Context. 250 * @return resource 251 * @throws \Hoa\Exception\Exception 252 */ 253 abstract protected function &_open($streamName, Context $context = null); 254 255 /** 256 * Close the current stream. 257 * Note: this method is protected, but do not forget that it could be 258 * overloaded into a public context. 259 * 260 * @return bool 261 */ 262 abstract protected function _close(); 263 264 /** 265 * Open the stream. 266 * 267 * @return \Hoa\Stream 268 * @throws \Hoa\Stream\Exception 269 */ 270 final public function open() 271 { 272 $context = $this->_context; 273 274 if (true === $this->hasBeenDeferred()) { 275 if (null === $context) { 276 $handle = Context::getInstance(uniqid()); 277 $handle->setParameters([ 278 'notification' => [$this, '_notify'] 279 ]); 280 $context = $handle->getId(); 281 } elseif (true === Context::contextExists($context)) { 282 $handle = Context::getInstance($context); 283 $parameters = $handle->getParameters(); 284 285 if (!isset($parameters['notification'])) { 286 $handle->setParameters([ 287 'notification' => [$this, '_notify'] 288 ]); 289 } 290 } 291 } 292 293 $this->_bufferSize = self::DEFAULT_BUFFER_SIZE; 294 $this->_bucket = self::_getStream( 295 $this->_streamName, 296 $this, 297 $context 298 ); 299 300 return $this; 301 } 302 303 /** 304 * Close the current stream. 305 * 306 * @return void 307 */ 308 final public function close() 309 { 310 $streamName = $this->getStreamName(); 311 $name = md5($streamName); 312 313 if (!isset(self::$_register[$name])) { 314 return; 315 } 316 317 Event::notify( 318 'hoa://Event/Stream/' . $streamName . ':close-before', 319 $this, 320 new Event\Bucket() 321 ); 322 323 if (false === $this->_close()) { 324 return; 325 } 326 327 unset(self::$_register[$name]); 328 $this->_bucket[self::HANDLER] = null; 329 Event::unregister( 330 'hoa://Event/Stream/' . $streamName 331 ); 332 Event::unregister( 333 'hoa://Event/Stream/' . $streamName . ':close-before' 334 ); 335 336 return; 337 } 338 339 /** 340 * Get the current stream name. 341 * 342 * @return string 343 */ 344 public function getStreamName() 345 { 346 if (empty($this->_bucket)) { 347 return null; 348 } 349 350 return $this->_bucket[self::NAME]; 351 } 352 353 /** 354 * Get the current stream. 355 * 356 * @return resource 357 */ 358 public function getStream() 359 { 360 if (empty($this->_bucket)) { 361 return null; 362 } 363 364 return $this->_bucket[self::RESOURCE]; 365 } 366 367 /** 368 * Get the current stream context. 369 * 370 * @return \Hoa\Stream\Context 371 */ 372 public function getStreamContext() 373 { 374 if (empty($this->_bucket)) { 375 return null; 376 } 377 378 return $this->_bucket[self::CONTEXT]; 379 } 380 381 /** 382 * Get stream handler according to its name. 383 * 384 * @param string $streamName Stream name. 385 * @return \Hoa\Stream 386 */ 387 public static function getStreamHandler($streamName) 388 { 389 $name = md5($streamName); 390 391 if (!isset(self::$_register[$name])) { 392 return null; 393 } 394 395 return self::$_register[$name][self::HANDLER]; 396 } 397 398 /** 399 * Set the current stream. Useful to manage a stack of streams (e.g. socket 400 * and select). Notice that it could be unsafe to use this method without 401 * taking time to think about it two minutes. Resource of type “Unknown” is 402 * considered as valid. 403 * 404 * @return resource 405 * @throws \Hoa\Stream\Exception 406 */ 407 public function _setStream($stream) 408 { 409 if (false === is_resource($stream) && 410 ('resource' !== gettype($stream) || 411 'Unknown' !== get_resource_type($stream))) { 412 throw new Exception( 413 'Try to change the stream resource with an invalid one; ' . 414 'given %s.', 415 1, 416 gettype($stream) 417 ); 418 } 419 420 $old = $this->_bucket[self::RESOURCE]; 421 $this->_bucket[self::RESOURCE] = $stream; 422 423 return $old; 424 } 425 426 /** 427 * Check if the stream is opened. 428 * 429 * @return bool 430 */ 431 public function isOpened() 432 { 433 return is_resource($this->getStream()); 434 } 435 436 /** 437 * Set the timeout period. 438 * 439 * @param int $seconds Timeout period in seconds. 440 * @param int $microseconds Timeout period in microseconds. 441 * @return bool 442 */ 443 public function setStreamTimeout($seconds, $microseconds = 0) 444 { 445 return stream_set_timeout($this->getStream(), $seconds, $microseconds); 446 } 447 448 /** 449 * Whether the opening of the stream has been deferred 450 */ 451 protected function hasBeenDeferred() 452 { 453 return $this->_hasBeenDeferred; 454 } 455 456 /** 457 * Check whether the connection has timed out or not. 458 * This is basically a shortcut of `getStreamMetaData` + the `timed_out` 459 * index, but the resulting code is more readable. 460 * 461 * @return bool 462 */ 463 public function hasTimedOut() 464 { 465 $metaData = $this->getStreamMetaData(); 466 467 return true === $metaData['timed_out']; 468 } 469 470 /** 471 * Set blocking/non-blocking mode. 472 * 473 * @param bool $mode Blocking mode. 474 * @return bool 475 */ 476 public function setStreamBlocking($mode) 477 { 478 return stream_set_blocking($this->getStream(), (int) $mode); 479 } 480 481 /** 482 * Set stream buffer. 483 * Output using fwrite() (or similar function) is normally buffered at 8 Ko. 484 * This means that if there are two processes wanting to write to the same 485 * output stream, each is paused after 8 Ko of data to allow the other to 486 * write. 487 * 488 * @param int $buffer Number of bytes to buffer. If zero, write 489 * operations are unbuffered. This ensures that 490 * all writes are completed before other 491 * processes are allowed to write to that output 492 * stream. 493 * @return bool 494 */ 495 public function setStreamBuffer($buffer) 496 { 497 // Zero means success. 498 $out = 0 === stream_set_write_buffer($this->getStream(), $buffer); 499 500 if (true === $out) { 501 $this->_bufferSize = $buffer; 502 } 503 504 return $out; 505 } 506 507 /** 508 * Disable stream buffering. 509 * Alias of $this->setBuffer(0). 510 * 511 * @return bool 512 */ 513 public function disableStreamBuffer() 514 { 515 return $this->setStreamBuffer(0); 516 } 517 518 /** 519 * Get stream buffer size. 520 * 521 * @return int 522 */ 523 public function getStreamBufferSize() 524 { 525 return $this->_bufferSize; 526 } 527 528 /** 529 * Get stream wrapper name. 530 * 531 * @return string 532 */ 533 public function getStreamWrapperName() 534 { 535 if (false === $pos = strpos($this->getStreamName(), '://')) { 536 return 'file'; 537 } 538 539 return substr($this->getStreamName(), 0, $pos); 540 } 541 542 /** 543 * Get stream meta data. 544 * 545 * @return array 546 */ 547 public function getStreamMetaData() 548 { 549 return stream_get_meta_data($this->getStream()); 550 } 551 552 /** 553 * Whether this stream is already opened by another handler. 554 * 555 * @return bool 556 */ 557 public function isBorrowing() 558 { 559 return $this->_borrowing; 560 } 561 562 /** 563 * Notification callback. 564 * 565 * @param int $ncode Notification code. Please, see 566 * STREAM_NOTIFY_* constants. 567 * @param int $severity Severity. Please, see 568 * STREAM_NOTIFY_SEVERITY_* constants. 569 * @param string $message Message. 570 * @param int $code Message code. 571 * @param int $transferred If applicable, the number of transferred 572 * bytes. 573 * @param int $max If applicable, the number of max bytes. 574 * @return void 575 */ 576 public function _notify( 577 $ncode, 578 $severity, 579 $message, 580 $code, 581 $transferred, 582 $max 583 ) { 584 static $_map = [ 585 STREAM_NOTIFY_AUTH_REQUIRED => 'authrequire', 586 STREAM_NOTIFY_AUTH_RESULT => 'authresult', 587 STREAM_NOTIFY_COMPLETED => 'complete', 588 STREAM_NOTIFY_CONNECT => 'connect', 589 STREAM_NOTIFY_FAILURE => 'failure', 590 STREAM_NOTIFY_MIME_TYPE_IS => 'mimetype', 591 STREAM_NOTIFY_PROGRESS => 'progress', 592 STREAM_NOTIFY_REDIRECTED => 'redirect', 593 STREAM_NOTIFY_RESOLVE => 'resolve', 594 STREAM_NOTIFY_FILE_SIZE_IS => 'size' 595 ]; 596 597 $this->getListener()->fire($_map[$ncode], new Event\Bucket([ 598 'code' => $code, 599 'severity' => $severity, 600 'message' => $message, 601 'transferred' => $transferred, 602 'max' => $max 603 ])); 604 605 return; 606 } 607 608 /** 609 * Call the $handler->close() method on each stream in the static stream 610 * register. 611 * This method does not check the return value of $handler->close(). Thus, 612 * if a stream is persistent, the $handler->close() should do anything. It 613 * is a very generic method. 614 * 615 * @return void 616 */ 617 final public static function _Hoa_Stream() 618 { 619 foreach (self::$_register as $entry) { 620 $entry[self::HANDLER]->close(); 621 } 622 623 return; 624 } 625 626 /** 627 * Transform object to string. 628 * 629 * @return string 630 */ 631 public function __toString() 632 { 633 return $this->getStreamName(); 634 } 635 636 /** 637 * Close the stream when destructing. 638 * 639 * @return void 640 */ 641 public function __destruct() 642 { 643 if (false === $this->isOpened()) { 644 return; 645 } 646 647 $this->close(); 648 649 return; 650 } 651} 652 653/** 654 * Class \Hoa\Stream\_Protocol. 655 * 656 * The `hoa://Library/Stream` node. 657 * 658 * @copyright Copyright © 2007-2017 Hoa community 659 * @license New BSD License 660 */ 661class _Protocol extends Protocol\Node 662{ 663 /** 664 * Component's name. 665 * 666 * @var string 667 */ 668 protected $_name = 'Stream'; 669 670 671 672 /** 673 * ID of the component. 674 * 675 * @param string $id ID of the component. 676 * @return mixed 677 */ 678 public function reachId($id) 679 { 680 return Stream::getStreamHandler($id); 681 } 682} 683 684/** 685 * Flex entity. 686 */ 687Consistency::flexEntity('Hoa\Stream\Stream'); 688 689/** 690 * Shutdown method. 691 */ 692Consistency::registerShutdownFunction(xcallable('Hoa\Stream\Stream::_Hoa_Stream')); 693 694/** 695 * Add the `hoa://Library/Stream` node. Should be use to reach/get an entry 696 * in the stream register. 697 */ 698$protocol = Protocol::getInstance(); 699$protocol['Library'][] = new _Protocol(); 700