1<?php 2/** 3 * This file is part of the FreeDSx Socket package. 4 * 5 * (c) Chad Sikorra <Chad.Sikorra@gmail.com> 6 * 7 * For the full copyright and license information, please view the LICENSE 8 * file that was distributed with this source code. 9 */ 10 11namespace FreeDSx\Socket\Queue; 12 13use FreeDSx\Socket\Exception\ConnectionException; 14use FreeDSx\Socket\Exception\PartialMessageException; 15use FreeDSx\Socket\Socket; 16 17/** 18 * Used to retrieve Messages/PDUs from a socket. 19 * 20 * @author Chad Sikorra <Chad.Sikorra@gmail.com> 21 */ 22abstract class MessageQueue 23{ 24 /** 25 * @var Socket 26 */ 27 protected $socket; 28 29 /** 30 * @var false|string 31 */ 32 protected $buffer = false; 33 34 /** 35 * @var string|null 36 */ 37 protected $toConsume = null; 38 39 /** 40 * @param Socket $socket 41 */ 42 public function __construct(Socket $socket) 43 { 44 $this->socket = $socket; 45 } 46 47 /** 48 * @param int|null $id 49 * @return \Generator 50 * @throws ConnectionException 51 */ 52 public function getMessages(?int $id = null) 53 { 54 if (!$this->hasBuffer()) { 55 $this->addToAvailableBufferOrFail(); 56 } 57 58 while ($this->hasBuffer()) { 59 try { 60 if ($this->hasAvailableBuffer()) { 61 $this->addToConsumableBuffer(); 62 } elseif (!$this->hasConsumableBuffer()) { 63 $this->addToAvailableBufferOrFail(); 64 } 65 } catch (PartialMessageException $exception) { 66 $this->addToAvailableBufferOrFail(); 67 } 68 69 try { 70 while ($this->hasConsumableBuffer()) { 71 $message = $this->consume(); 72 if ($message !== null) { 73 yield $this->constructMessage($message, $id); 74 } 75 } 76 } catch (PartialMessageException $e) { 77 if ($this->hasAvailableBuffer()) { 78 $this->addToConsumableBuffer(); 79 } else { 80 $this->addToAvailableBufferOrFail(); 81 } 82 } 83 } 84 } 85 86 protected function addToAvailableBufferOrFail(): void 87 { 88 $bytes = $this->socket->read(); 89 90 if ($bytes === false) { 91 throw new ConnectionException('The connection to the server has been lost.'); 92 } 93 94 $this->buffer .= $bytes; 95 } 96 97 protected function addToConsumableBuffer(): void 98 { 99 if ($this->hasAvailableBuffer()) { 100 $buffer = $this->unwrap((string)$this->buffer); 101 $this->buffer = \substr((string)$this->buffer, $buffer->endsAt()); 102 $this->toConsume .= $buffer->bytes(); 103 } 104 } 105 106 protected function hasBuffer(): bool 107 { 108 return $this->hasConsumableBuffer() || $this->hasAvailableBuffer(); 109 } 110 111 protected function hasAvailableBuffer(): bool 112 { 113 return \strlen((string)$this->buffer) !== 0; 114 } 115 116 protected function hasConsumableBuffer(): bool 117 { 118 return \strlen((string)$this->toConsume) !== 0; 119 } 120 121 /** 122 * @return Message|null 123 * @throws PartialMessageException 124 */ 125 protected function consume(): ?Message 126 { 127 $message = null; 128 129 try { 130 $message = $this->decode($this->toConsume); 131 $lastPos = (int)$message->getLastPosition(); 132 $this->toConsume = \substr($this->toConsume, $lastPos); 133 134 if ($this->toConsume === '' && ($peek = $this->socket->read(false)) !== false) { 135 $this->buffer .= $peek; 136 } 137 } catch (PartialMessageException $exception) { 138 # If we have available buffer, it might have what we need. Attempt to add it. Otherwise let it bubble... 139 if ($this->hasAvailableBuffer()) { 140 $this->addToConsumableBuffer(); 141 } else { 142 throw $exception; 143 } 144 } 145 146 # Adding to the consumed before could cause this to succeed, so retry. 147 if ($message === null) { 148 return $this->consume(); 149 } 150 151 return $message; 152 } 153 154 /** 155 * @param string $bytes 156 * @return Buffer 157 */ 158 protected function unwrap($bytes) : Buffer 159 { 160 return new Buffer($bytes, \strlen($bytes)); 161 } 162 163 /** 164 * Decode the bytes to an object. If you have a partial object, throw the PartialMessageException and the queue 165 * will attempt to append more data to the buffer. 166 * 167 * @param string $bytes 168 * @return Message 169 * @throws PartialMessageException 170 */ 171 protected abstract function decode($bytes) : Message; 172 173 /** 174 * @param int|null $id 175 * @return mixed 176 * @throws ConnectionException 177 */ 178 public function getMessage(?int $id = null) 179 { 180 return $this->getMessages($id)->current(); 181 } 182 183 /** 184 * Retrieve the message object from the message. Allow for special construction / validation if needed. 185 * 186 * @param Message $message 187 * @param int|null $id 188 * @return mixed 189 */ 190 protected function constructMessage(Message $message, ?int $id = null) 191 { 192 return $message->getMessage(); 193 } 194} 195