1 <?php
2 
3 /**
4  * Hoa
5  *
6  *
7  * @license
8  *
9  * New BSD License
10  *
11  * Copyright © 2007-2017, Hoa community. All rights reserved.
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *     * Redistributions of source code must retain the above copyright
16  *       notice, this list of conditions and the following disclaimer.
17  *     * Redistributions in binary form must reproduce the above copyright
18  *       notice, this list of conditions and the following disclaimer in the
19  *       documentation and/or other materials provided with the distribution.
20  *     * Neither the name of the Hoa nor the names of its contributors may be
21  *       used to endorse or promote products derived from this software without
22  *       specific prior written permission.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 namespace Hoa\Stream;
38 
39 use Hoa\Consistency;
40 use Hoa\Event;
41 use Hoa\Protocol;
42 
43 /**
44  * Class \Hoa\Stream.
45  *
46  * Static register for all streams (files, sockets etc.).
47  *
48  * @copyright  Copyright © 2007-2017 Hoa community
49  * @license    New BSD License
50  */
51 abstract class Stream implements IStream\Stream, Event\Listenable
52 {
53     use Event\Listens;
54 
55     /**
56      * Name index in the stream bucket.
57      *
58      * @const int
59      */
60     const NAME                = 0;
61 
62     /**
63      * Handler index in the stream bucket.
64      *
65      * @const int
66      */
67     const HANDLER             = 1;
68 
69     /**
70      * Resource index in the stream bucket.
71      *
72      * @const int
73      */
74     const RESOURCE            = 2;
75 
76     /**
77      * Context index in the stream bucket.
78      *
79      * @const int
80      */
81     const CONTEXT             = 3;
82 
83     /**
84      * Default buffer size.
85      *
86      * @const int
87      */
88     const DEFAULT_BUFFER_SIZE = 8192;
89 
90     /**
91      * Current stream bucket.
92      *
93      * @var array
94      */
95     protected $_bucket          = [];
96 
97     /**
98      * Static stream register.
99      *
100      * @var array
101      */
102     private static $_register   = [];
103 
104     /**
105      * Buffer size (default is 8Ko).
106      *
107      * @var bool
108      */
109     protected $_bufferSize      = self::DEFAULT_BUFFER_SIZE;
110 
111     /**
112      * Original stream name, given to the stream constructor.
113      *
114      * @var string
115      */
116     protected $_streamName      = null;
117 
118     /**
119      * Context name.
120      *
121      * @var string
122      */
123     protected $_context         = null;
124 
125     /**
126      * Whether the opening has been deferred.
127      *
128      * @var bool
129      */
130     protected $_hasBeenDeferred = false;
131 
132     /**
133      * Whether this stream is already opened by another handler.
134      *
135      * @var bool
136      */
137     protected $_borrowing       = false;
138 
139 
140 
141     /**
142      * Set the current stream.
143      * If not exists in the register, try to call the
144      * `$this->_open()` method. Please, see the `self::_getStream()` method.
145      *
146      * @param   string  $streamName    Stream name (e.g. path or URL).
147      * @param   string  $context       Context ID (please, see the
148      *                                 `Hoa\Stream\Context` class).
149      * @param   bool    $wait          Differ opening or not.
150      */
151     public function __construct($streamName, $context = null, $wait = false)
152     {
153         $this->_streamName      = $streamName;
154         $this->_context         = $context;
155         $this->_hasBeenDeferred = $wait;
156         $this->setListener(
157             new Event\Listener(
158                 $this,
159                 [
160                     'authrequire',
161                     'authresult',
162                     'complete',
163                     'connect',
164                     'failure',
165                     'mimetype',
166                     'progress',
167                     'redirect',
168                     'resolve',
169                     'size'
170                 ]
171             )
172         );
173 
174         if (true === $wait) {
175             return;
176         }
177 
178         $this->open();
179 
180         return;
181     }
182 
183     /**
184      * Get a stream in the register.
185      * If the stream does not exist, try to open it by calling the
186      * $handler->_open() method.
187      *
188      * @param   string       $streamName    Stream name.
189      * @param   \Hoa\Stream  $handler       Stream handler.
190      * @param   string       $context       Context ID (please, see the
191      *                                      \Hoa\Stream\Context class).
192      * @return  array
193      * @throws  \Hoa\Stream\Exception
194      */
195     final private static function &_getStream(
196         $streamName,
197         Stream $handler,
198         $context = null
199     ) {
200         $name = md5($streamName);
201 
202         if (null !== $context) {
203             if (false === Context::contextExists($context)) {
204                 throw new Exception(
205                     'Context %s was not previously declared, cannot retrieve ' .
206                     'this context.',
207                     0,
208                     $context
209                 );
210             }
211 
212             $context = Context::getInstance($context);
213         }
214 
215         if (!isset(self::$_register[$name])) {
216             self::$_register[$name] = [
217                 self::NAME     => $streamName,
218                 self::HANDLER  => $handler,
219                 self::RESOURCE => $handler->_open($streamName, $context),
220                 self::CONTEXT  => $context
221             ];
222             Event::register(
223                 'hoa://Event/Stream/' . $streamName,
224                 $handler
225             );
226             // Add :open-ready?
227             Event::register(
228                 'hoa://Event/Stream/' . $streamName . ':close-before',
229                 $handler
230             );
231         } else {
232             $handler->_borrowing = true;
233         }
234 
235         if (null === self::$_register[$name][self::RESOURCE]) {
236             self::$_register[$name][self::RESOURCE]
237                 = $handler->_open($streamName, $context);
238         }
239 
240         return self::$_register[$name];
241     }
242 
243     /**
244      * Open the stream and return the associated resource.
245      * Note: This method is protected, but do not forget that it could be
246      * overloaded into a public context.
247      *
248      * @param   string               $streamName    Stream name (e.g. path or URL).
249      * @param   \Hoa\Stream\Context  $context       Context.
250      * @return  resource
251      * @throws  \Hoa\Exception\Exception
252      */
253     abstract protected function &_open($streamName, Context $context = null);
254 
255     /**
256      * Close the current stream.
257      * Note: this method is protected, but do not forget that it could be
258      * overloaded into a public context.
259      *
260      * @return  bool
261      */
262     abstract protected function _close();
263 
264     /**
265      * Open the stream.
266      *
267      * @return  \Hoa\Stream
268      * @throws  \Hoa\Stream\Exception
269      */
270     final public function open()
271     {
272         $context = $this->_context;
273 
274         if (true === $this->hasBeenDeferred()) {
275             if (null === $context) {
276                 $handle = Context::getInstance(uniqid());
277                 $handle->setParameters([
278                     'notification' => [$this, '_notify']
279                 ]);
280                 $context = $handle->getId();
281             } elseif (true === Context::contextExists($context)) {
282                 $handle     = Context::getInstance($context);
283                 $parameters = $handle->getParameters();
284 
285                 if (!isset($parameters['notification'])) {
286                     $handle->setParameters([
287                         'notification' => [$this, '_notify']
288                     ]);
289                 }
290             }
291         }
292 
293         $this->_bufferSize = self::DEFAULT_BUFFER_SIZE;
294         $this->_bucket     = self::_getStream(
295             $this->_streamName,
296             $this,
297             $context
298         );
299 
300         return $this;
301     }
302 
303     /**
304      * Close the current stream.
305      *
306      * @return  void
307      */
308     final public function close()
309     {
310         $streamName = $this->getStreamName();
311         $name       = md5($streamName);
312 
313         if (!isset(self::$_register[$name])) {
314             return;
315         }
316 
317         Event::notify(
318             'hoa://Event/Stream/' . $streamName . ':close-before',
319             $this,
320             new Event\Bucket()
321         );
322 
323         if (false === $this->_close()) {
324             return;
325         }
326 
327         unset(self::$_register[$name]);
328         $this->_bucket[self::HANDLER] = null;
329         Event::unregister(
330             'hoa://Event/Stream/' . $streamName
331         );
332         Event::unregister(
333             'hoa://Event/Stream/' . $streamName . ':close-before'
334         );
335 
336         return;
337     }
338 
339     /**
340      * Get the current stream name.
341      *
342      * @return  string
343      */
344     public function getStreamName()
345     {
346         if (empty($this->_bucket)) {
347             return null;
348         }
349 
350         return $this->_bucket[self::NAME];
351     }
352 
353     /**
354      * Get the current stream.
355      *
356      * @return  resource
357      */
358     public function getStream()
359     {
360         if (empty($this->_bucket)) {
361             return null;
362         }
363 
364         return $this->_bucket[self::RESOURCE];
365     }
366 
367     /**
368      * Get the current stream context.
369      *
370      * @return  \Hoa\Stream\Context
371      */
372     public function getStreamContext()
373     {
374         if (empty($this->_bucket)) {
375             return null;
376         }
377 
378         return $this->_bucket[self::CONTEXT];
379     }
380 
381     /**
382      * Get stream handler according to its name.
383      *
384      * @param   string  $streamName    Stream name.
385      * @return  \Hoa\Stream
386      */
387     public static function getStreamHandler($streamName)
388     {
389         $name = md5($streamName);
390 
391         if (!isset(self::$_register[$name])) {
392             return null;
393         }
394 
395         return self::$_register[$name][self::HANDLER];
396     }
397 
398     /**
399      * Set the current stream. Useful to manage a stack of streams (e.g. socket
400      * and select). Notice that it could be unsafe to use this method without
401      * taking time to think about it two minutes. Resource of type “Unknown” is
402      * considered as valid.
403      *
404      * @return  resource
405      * @throws  \Hoa\Stream\Exception
406      */
407     public function _setStream($stream)
408     {
409         if (false === is_resource($stream) &&
410             ('resource' !== gettype($stream) ||
411              'Unknown'  !== get_resource_type($stream))) {
412             throw new Exception(
413                 'Try to change the stream resource with an invalid one; ' .
414                 'given %s.',
415                 1,
416                 gettype($stream)
417             );
418         }
419 
420         $old                           = $this->_bucket[self::RESOURCE];
421         $this->_bucket[self::RESOURCE] = $stream;
422 
423         return $old;
424     }
425 
426     /**
427      * Check if the stream is opened.
428      *
429      * @return  bool
430      */
431     public function isOpened()
432     {
433         return is_resource($this->getStream());
434     }
435 
436     /**
437      * Set the timeout period.
438      *
439      * @param   int     $seconds         Timeout period in seconds.
440      * @param   int     $microseconds    Timeout period in microseconds.
441      * @return  bool
442      */
443     public function setStreamTimeout($seconds, $microseconds = 0)
444     {
445         return stream_set_timeout($this->getStream(), $seconds, $microseconds);
446     }
447 
448     /**
449      * Whether the opening of the stream has been deferred
450      */
451     protected function hasBeenDeferred()
452     {
453         return $this->_hasBeenDeferred;
454     }
455 
456     /**
457      * Check whether the connection has timed out or not.
458      * This is basically a shortcut of `getStreamMetaData` + the `timed_out`
459      * index, but the resulting code is more readable.
460      *
461      * @return bool
462      */
463     public function hasTimedOut()
464     {
465         $metaData = $this->getStreamMetaData();
466 
467         return true === $metaData['timed_out'];
468     }
469 
470     /**
471      * Set blocking/non-blocking mode.
472      *
473      * @param   bool    $mode    Blocking mode.
474      * @return  bool
475      */
476     public function setStreamBlocking($mode)
477     {
478         return stream_set_blocking($this->getStream(), (int) $mode);
479     }
480 
481     /**
482      * Set stream buffer.
483      * Output using fwrite() (or similar function) is normally buffered at 8 Ko.
484      * This means that if there are two processes wanting to write to the same
485      * output stream, each is paused after 8 Ko of data to allow the other to
486      * write.
487      *
488      * @param   int     $buffer    Number of bytes to buffer. If zero, write
489      *                             operations are unbuffered. This ensures that
490      *                             all writes are completed before other
491      *                             processes are allowed to write to that output
492      *                             stream.
493      * @return  bool
494      */
495     public function setStreamBuffer($buffer)
496     {
497         // Zero means success.
498         $out = 0 === stream_set_write_buffer($this->getStream(), $buffer);
499 
500         if (true === $out) {
501             $this->_bufferSize = $buffer;
502         }
503 
504         return $out;
505     }
506 
507     /**
508      * Disable stream buffering.
509      * Alias of $this->setBuffer(0).
510      *
511      * @return  bool
512      */
513     public function disableStreamBuffer()
514     {
515         return $this->setStreamBuffer(0);
516     }
517 
518     /**
519      * Get stream buffer size.
520      *
521      * @return  int
522      */
523     public function getStreamBufferSize()
524     {
525         return $this->_bufferSize;
526     }
527 
528     /**
529      * Get stream wrapper name.
530      *
531      * @return  string
532      */
533     public function getStreamWrapperName()
534     {
535         if (false === $pos = strpos($this->getStreamName(), '://')) {
536             return 'file';
537         }
538 
539         return substr($this->getStreamName(), 0, $pos);
540     }
541 
542     /**
543      * Get stream meta data.
544      *
545      * @return  array
546      */
547     public function getStreamMetaData()
548     {
549         return stream_get_meta_data($this->getStream());
550     }
551 
552     /**
553      * Whether this stream is already opened by another handler.
554      *
555      * @return  bool
556      */
557     public function isBorrowing()
558     {
559         return $this->_borrowing;
560     }
561 
562     /**
563      * Notification callback.
564      *
565      * @param   int     $ncode          Notification code. Please, see
566      *                                  STREAM_NOTIFY_* constants.
567      * @param   int     $severity       Severity. Please, see
568      *                                  STREAM_NOTIFY_SEVERITY_* constants.
569      * @param   string  $message        Message.
570      * @param   int     $code           Message code.
571      * @param   int     $transferred    If applicable, the number of transferred
572      *                                  bytes.
573      * @param   int     $max            If applicable, the number of max bytes.
574      * @return  void
575      */
576     public function _notify(
577         $ncode,
578         $severity,
579         $message,
580         $code,
581         $transferred,
582         $max
583     ) {
584         static $_map = [
585             STREAM_NOTIFY_AUTH_REQUIRED => 'authrequire',
586             STREAM_NOTIFY_AUTH_RESULT   => 'authresult',
587             STREAM_NOTIFY_COMPLETED     => 'complete',
588             STREAM_NOTIFY_CONNECT       => 'connect',
589             STREAM_NOTIFY_FAILURE       => 'failure',
590             STREAM_NOTIFY_MIME_TYPE_IS  => 'mimetype',
591             STREAM_NOTIFY_PROGRESS      => 'progress',
592             STREAM_NOTIFY_REDIRECTED    => 'redirect',
593             STREAM_NOTIFY_RESOLVE       => 'resolve',
594             STREAM_NOTIFY_FILE_SIZE_IS  => 'size'
595         ];
596 
597         $this->getListener()->fire($_map[$ncode], new Event\Bucket([
598             'code'        => $code,
599             'severity'    => $severity,
600             'message'     => $message,
601             'transferred' => $transferred,
602             'max'         => $max
603         ]));
604 
605         return;
606     }
607 
608     /**
609      * Call the $handler->close() method on each stream in the static stream
610      * register.
611      * This method does not check the return value of $handler->close(). Thus,
612      * if a stream is persistent, the $handler->close() should do anything. It
613      * is a very generic method.
614      *
615      * @return  void
616      */
617     final public static function _Hoa_Stream()
618     {
619         foreach (self::$_register as $entry) {
620             $entry[self::HANDLER]->close();
621         }
622 
623         return;
624     }
625 
626     /**
627      * Transform object to string.
628      *
629      * @return  string
630      */
631     public function __toString()
632     {
633         return $this->getStreamName();
634     }
635 
636     /**
637      * Close the stream when destructing.
638      *
639      * @return  void
640      */
641     public function __destruct()
642     {
643         if (false === $this->isOpened()) {
644             return;
645         }
646 
647         $this->close();
648 
649         return;
650     }
651 }
652 
653 /**
654  * Class \Hoa\Stream\_Protocol.
655  *
656  * The `hoa://Library/Stream` node.
657  *
658  * @copyright  Copyright © 2007-2017 Hoa community
659  * @license    New BSD License
660  */
661 class _Protocol extends Protocol\Node
662 {
663     /**
664      * Component's name.
665      *
666      * @var string
667      */
668     protected $_name = 'Stream';
669 
670 
671 
672     /**
673      * ID of the component.
674      *
675      * @param   string  $id    ID of the component.
676      * @return  mixed
677      */
678     public function reachId($id)
679     {
680         return Stream::getStreamHandler($id);
681     }
682 }
683 
684 /**
685  * Flex entity.
686  */
687 Consistency::flexEntity('Hoa\Stream\Stream');
688 
689 /**
690  * Shutdown method.
691  */
692 Consistency::registerShutdownFunction(xcallable('Hoa\Stream\Stream::_Hoa_Stream'));
693 
694 /**
695  * Add the `hoa://Library/Stream` node. Should be use to reach/get an entry
696  * in the stream register.
697  */
698 $protocol              = Protocol::getInstance();
699 $protocol['Library'][] = new _Protocol();
700