10b3fd2d3SAndreas Gohr<?php 20b3fd2d3SAndreas Gohr/** 30b3fd2d3SAndreas Gohr * This file is part of the FreeDSx Socket package. 40b3fd2d3SAndreas Gohr * 50b3fd2d3SAndreas Gohr * (c) Chad Sikorra <Chad.Sikorra@gmail.com> 60b3fd2d3SAndreas Gohr * 70b3fd2d3SAndreas Gohr * For the full copyright and license information, please view the LICENSE 80b3fd2d3SAndreas Gohr * file that was distributed with this source code. 90b3fd2d3SAndreas Gohr */ 100b3fd2d3SAndreas Gohr 110b3fd2d3SAndreas Gohrnamespace FreeDSx\Socket\Queue; 120b3fd2d3SAndreas Gohr 130b3fd2d3SAndreas Gohruse FreeDSx\Socket\Exception\ConnectionException; 140b3fd2d3SAndreas Gohruse FreeDSx\Socket\Exception\PartialMessageException; 150b3fd2d3SAndreas Gohruse FreeDSx\Socket\Socket; 160b3fd2d3SAndreas Gohr 170b3fd2d3SAndreas Gohr/** 180b3fd2d3SAndreas Gohr * Used to retrieve Messages/PDUs from a socket. 190b3fd2d3SAndreas Gohr * 200b3fd2d3SAndreas Gohr * @author Chad Sikorra <Chad.Sikorra@gmail.com> 210b3fd2d3SAndreas Gohr */ 220b3fd2d3SAndreas Gohrabstract class MessageQueue 230b3fd2d3SAndreas Gohr{ 240b3fd2d3SAndreas Gohr /** 250b3fd2d3SAndreas Gohr * @var Socket 260b3fd2d3SAndreas Gohr */ 270b3fd2d3SAndreas Gohr protected $socket; 280b3fd2d3SAndreas Gohr 290b3fd2d3SAndreas Gohr /** 300b3fd2d3SAndreas Gohr * @var false|string 310b3fd2d3SAndreas Gohr */ 320b3fd2d3SAndreas Gohr protected $buffer = false; 330b3fd2d3SAndreas Gohr 340b3fd2d3SAndreas Gohr /** 350b3fd2d3SAndreas Gohr * @var string|null 360b3fd2d3SAndreas Gohr */ 370b3fd2d3SAndreas Gohr protected $toConsume = null; 380b3fd2d3SAndreas Gohr 390b3fd2d3SAndreas Gohr /** 400b3fd2d3SAndreas Gohr * @param Socket $socket 410b3fd2d3SAndreas Gohr */ 420b3fd2d3SAndreas Gohr public function __construct(Socket $socket) 430b3fd2d3SAndreas Gohr { 440b3fd2d3SAndreas Gohr $this->socket = $socket; 450b3fd2d3SAndreas Gohr } 460b3fd2d3SAndreas Gohr 470b3fd2d3SAndreas Gohr /** 480b3fd2d3SAndreas Gohr * @param int|null $id 490b3fd2d3SAndreas Gohr * @return \Generator 500b3fd2d3SAndreas Gohr * @throws ConnectionException 510b3fd2d3SAndreas Gohr */ 520b3fd2d3SAndreas Gohr public function getMessages(?int $id = null) 530b3fd2d3SAndreas Gohr { 540b3fd2d3SAndreas Gohr if (!$this->hasBuffer()) { 550b3fd2d3SAndreas Gohr $this->addToAvailableBufferOrFail(); 560b3fd2d3SAndreas Gohr } 570b3fd2d3SAndreas Gohr 580b3fd2d3SAndreas Gohr while ($this->hasBuffer()) { 590b3fd2d3SAndreas Gohr try { 600b3fd2d3SAndreas Gohr if ($this->hasAvailableBuffer()) { 610b3fd2d3SAndreas Gohr $this->addToConsumableBuffer(); 620b3fd2d3SAndreas Gohr } elseif (!$this->hasConsumableBuffer()) { 630b3fd2d3SAndreas Gohr $this->addToAvailableBufferOrFail(); 640b3fd2d3SAndreas Gohr } 650b3fd2d3SAndreas Gohr } catch (PartialMessageException $exception) { 660b3fd2d3SAndreas Gohr $this->addToAvailableBufferOrFail(); 670b3fd2d3SAndreas Gohr } 680b3fd2d3SAndreas Gohr 690b3fd2d3SAndreas Gohr try { 700b3fd2d3SAndreas Gohr while ($this->hasConsumableBuffer()) { 710b3fd2d3SAndreas Gohr $message = $this->consume(); 720b3fd2d3SAndreas Gohr if ($message !== null) { 730b3fd2d3SAndreas Gohr yield $this->constructMessage($message, $id); 740b3fd2d3SAndreas Gohr } 750b3fd2d3SAndreas Gohr } 760b3fd2d3SAndreas Gohr } catch (PartialMessageException $e) { 770b3fd2d3SAndreas Gohr if ($this->hasAvailableBuffer()) { 780b3fd2d3SAndreas Gohr $this->addToConsumableBuffer(); 790b3fd2d3SAndreas Gohr } else { 800b3fd2d3SAndreas Gohr $this->addToAvailableBufferOrFail(); 810b3fd2d3SAndreas Gohr } 820b3fd2d3SAndreas Gohr } 830b3fd2d3SAndreas Gohr } 840b3fd2d3SAndreas Gohr } 850b3fd2d3SAndreas Gohr 860b3fd2d3SAndreas Gohr protected function addToAvailableBufferOrFail(): void 870b3fd2d3SAndreas Gohr { 880b3fd2d3SAndreas Gohr $bytes = $this->socket->read(); 890b3fd2d3SAndreas Gohr 900b3fd2d3SAndreas Gohr if ($bytes === false) { 910b3fd2d3SAndreas Gohr throw new ConnectionException('The connection to the server has been lost.'); 920b3fd2d3SAndreas Gohr } 930b3fd2d3SAndreas Gohr 940b3fd2d3SAndreas Gohr $this->buffer .= $bytes; 950b3fd2d3SAndreas Gohr } 960b3fd2d3SAndreas Gohr 970b3fd2d3SAndreas Gohr protected function addToConsumableBuffer(): void 980b3fd2d3SAndreas Gohr { 990b3fd2d3SAndreas Gohr if ($this->hasAvailableBuffer()) { 1000b3fd2d3SAndreas Gohr $buffer = $this->unwrap((string)$this->buffer); 1010b3fd2d3SAndreas Gohr $this->buffer = \substr((string)$this->buffer, $buffer->endsAt()); 1020b3fd2d3SAndreas Gohr $this->toConsume .= $buffer->bytes(); 1030b3fd2d3SAndreas Gohr } 1040b3fd2d3SAndreas Gohr } 1050b3fd2d3SAndreas Gohr 1060b3fd2d3SAndreas Gohr protected function hasBuffer(): bool 1070b3fd2d3SAndreas Gohr { 1080b3fd2d3SAndreas Gohr return $this->hasConsumableBuffer() || $this->hasAvailableBuffer(); 1090b3fd2d3SAndreas Gohr } 1100b3fd2d3SAndreas Gohr 1110b3fd2d3SAndreas Gohr protected function hasAvailableBuffer(): bool 1120b3fd2d3SAndreas Gohr { 1130b3fd2d3SAndreas Gohr return \strlen((string)$this->buffer) !== 0; 1140b3fd2d3SAndreas Gohr } 1150b3fd2d3SAndreas Gohr 1160b3fd2d3SAndreas Gohr protected function hasConsumableBuffer(): bool 1170b3fd2d3SAndreas Gohr { 118*dad993c5SAndreas Gohr return \strlen((string)$this->toConsume) !== 0; 1190b3fd2d3SAndreas Gohr } 1200b3fd2d3SAndreas Gohr 1210b3fd2d3SAndreas Gohr /** 1220b3fd2d3SAndreas Gohr * @return Message|null 1230b3fd2d3SAndreas Gohr * @throws PartialMessageException 1240b3fd2d3SAndreas Gohr */ 1250b3fd2d3SAndreas Gohr protected function consume(): ?Message 1260b3fd2d3SAndreas Gohr { 1270b3fd2d3SAndreas Gohr $message = null; 1280b3fd2d3SAndreas Gohr 1290b3fd2d3SAndreas Gohr try { 1300b3fd2d3SAndreas Gohr $message = $this->decode($this->toConsume); 1310b3fd2d3SAndreas Gohr $lastPos = (int)$message->getLastPosition(); 1320b3fd2d3SAndreas Gohr $this->toConsume = \substr($this->toConsume, $lastPos); 1330b3fd2d3SAndreas Gohr 1340b3fd2d3SAndreas Gohr if ($this->toConsume === '' && ($peek = $this->socket->read(false)) !== false) { 1350b3fd2d3SAndreas Gohr $this->buffer .= $peek; 1360b3fd2d3SAndreas Gohr } 1370b3fd2d3SAndreas Gohr } catch (PartialMessageException $exception) { 1380b3fd2d3SAndreas Gohr # If we have available buffer, it might have what we need. Attempt to add it. Otherwise let it bubble... 1390b3fd2d3SAndreas Gohr if ($this->hasAvailableBuffer()) { 1400b3fd2d3SAndreas Gohr $this->addToConsumableBuffer(); 1410b3fd2d3SAndreas Gohr } else { 1420b3fd2d3SAndreas Gohr throw $exception; 1430b3fd2d3SAndreas Gohr } 1440b3fd2d3SAndreas Gohr } 1450b3fd2d3SAndreas Gohr 1460b3fd2d3SAndreas Gohr # Adding to the consumed before could cause this to succeed, so retry. 1470b3fd2d3SAndreas Gohr if ($message === null) { 1480b3fd2d3SAndreas Gohr return $this->consume(); 1490b3fd2d3SAndreas Gohr } 1500b3fd2d3SAndreas Gohr 1510b3fd2d3SAndreas Gohr return $message; 1520b3fd2d3SAndreas Gohr } 1530b3fd2d3SAndreas Gohr 1540b3fd2d3SAndreas Gohr /** 1550b3fd2d3SAndreas Gohr * @param string $bytes 1560b3fd2d3SAndreas Gohr * @return Buffer 1570b3fd2d3SAndreas Gohr */ 1580b3fd2d3SAndreas Gohr protected function unwrap($bytes) : Buffer 1590b3fd2d3SAndreas Gohr { 1600b3fd2d3SAndreas Gohr return new Buffer($bytes, \strlen($bytes)); 1610b3fd2d3SAndreas Gohr } 1620b3fd2d3SAndreas Gohr 1630b3fd2d3SAndreas Gohr /** 1640b3fd2d3SAndreas Gohr * Decode the bytes to an object. If you have a partial object, throw the PartialMessageException and the queue 1650b3fd2d3SAndreas Gohr * will attempt to append more data to the buffer. 1660b3fd2d3SAndreas Gohr * 1670b3fd2d3SAndreas Gohr * @param string $bytes 1680b3fd2d3SAndreas Gohr * @return Message 1690b3fd2d3SAndreas Gohr * @throws PartialMessageException 1700b3fd2d3SAndreas Gohr */ 1710b3fd2d3SAndreas Gohr protected abstract function decode($bytes) : Message; 1720b3fd2d3SAndreas Gohr 1730b3fd2d3SAndreas Gohr /** 1740b3fd2d3SAndreas Gohr * @param int|null $id 1750b3fd2d3SAndreas Gohr * @return mixed 1760b3fd2d3SAndreas Gohr * @throws ConnectionException 1770b3fd2d3SAndreas Gohr */ 1780b3fd2d3SAndreas Gohr public function getMessage(?int $id = null) 1790b3fd2d3SAndreas Gohr { 1800b3fd2d3SAndreas Gohr return $this->getMessages($id)->current(); 1810b3fd2d3SAndreas Gohr } 1820b3fd2d3SAndreas Gohr 1830b3fd2d3SAndreas Gohr /** 1840b3fd2d3SAndreas Gohr * Retrieve the message object from the message. Allow for special construction / validation if needed. 1850b3fd2d3SAndreas Gohr * 1860b3fd2d3SAndreas Gohr * @param Message $message 1870b3fd2d3SAndreas Gohr * @param int|null $id 1880b3fd2d3SAndreas Gohr * @return mixed 1890b3fd2d3SAndreas Gohr */ 1900b3fd2d3SAndreas Gohr protected function constructMessage(Message $message, ?int $id = null) 1910b3fd2d3SAndreas Gohr { 1920b3fd2d3SAndreas Gohr return $message->getMessage(); 1930b3fd2d3SAndreas Gohr } 1940b3fd2d3SAndreas Gohr} 195