xref: /plugin/pureldap/vendor/freedsx/socket/src/FreeDSx/Socket/Queue/MessageQueue.php (revision dad993c57a70866aa1db59c43f043769c2eb7ed0)
10b3fd2d3SAndreas Gohr<?php
20b3fd2d3SAndreas Gohr/**
30b3fd2d3SAndreas Gohr * This file is part of the FreeDSx Socket package.
40b3fd2d3SAndreas Gohr *
50b3fd2d3SAndreas Gohr * (c) Chad Sikorra <Chad.Sikorra@gmail.com>
60b3fd2d3SAndreas Gohr *
70b3fd2d3SAndreas Gohr * For the full copyright and license information, please view the LICENSE
80b3fd2d3SAndreas Gohr * file that was distributed with this source code.
90b3fd2d3SAndreas Gohr */
100b3fd2d3SAndreas Gohr
110b3fd2d3SAndreas Gohrnamespace FreeDSx\Socket\Queue;
120b3fd2d3SAndreas Gohr
130b3fd2d3SAndreas Gohruse FreeDSx\Socket\Exception\ConnectionException;
140b3fd2d3SAndreas Gohruse FreeDSx\Socket\Exception\PartialMessageException;
150b3fd2d3SAndreas Gohruse FreeDSx\Socket\Socket;
160b3fd2d3SAndreas Gohr
170b3fd2d3SAndreas Gohr/**
180b3fd2d3SAndreas Gohr * Used to retrieve Messages/PDUs from a socket.
190b3fd2d3SAndreas Gohr *
200b3fd2d3SAndreas Gohr * @author Chad Sikorra <Chad.Sikorra@gmail.com>
210b3fd2d3SAndreas Gohr */
220b3fd2d3SAndreas Gohrabstract class MessageQueue
230b3fd2d3SAndreas Gohr{
240b3fd2d3SAndreas Gohr    /**
250b3fd2d3SAndreas Gohr     * @var Socket
260b3fd2d3SAndreas Gohr     */
270b3fd2d3SAndreas Gohr    protected $socket;
280b3fd2d3SAndreas Gohr
290b3fd2d3SAndreas Gohr    /**
300b3fd2d3SAndreas Gohr     * @var false|string
310b3fd2d3SAndreas Gohr     */
320b3fd2d3SAndreas Gohr    protected $buffer = false;
330b3fd2d3SAndreas Gohr
340b3fd2d3SAndreas Gohr    /**
350b3fd2d3SAndreas Gohr     * @var string|null
360b3fd2d3SAndreas Gohr     */
370b3fd2d3SAndreas Gohr    protected $toConsume = null;
380b3fd2d3SAndreas Gohr
390b3fd2d3SAndreas Gohr    /**
400b3fd2d3SAndreas Gohr     * @param Socket $socket
410b3fd2d3SAndreas Gohr     */
420b3fd2d3SAndreas Gohr    public function __construct(Socket $socket)
430b3fd2d3SAndreas Gohr    {
440b3fd2d3SAndreas Gohr        $this->socket = $socket;
450b3fd2d3SAndreas Gohr    }
460b3fd2d3SAndreas Gohr
470b3fd2d3SAndreas Gohr    /**
480b3fd2d3SAndreas Gohr     * @param int|null $id
490b3fd2d3SAndreas Gohr     * @return \Generator
500b3fd2d3SAndreas Gohr     * @throws ConnectionException
510b3fd2d3SAndreas Gohr     */
520b3fd2d3SAndreas Gohr    public function getMessages(?int $id = null)
530b3fd2d3SAndreas Gohr    {
540b3fd2d3SAndreas Gohr        if (!$this->hasBuffer()) {
550b3fd2d3SAndreas Gohr            $this->addToAvailableBufferOrFail();
560b3fd2d3SAndreas Gohr        }
570b3fd2d3SAndreas Gohr
580b3fd2d3SAndreas Gohr        while ($this->hasBuffer()) {
590b3fd2d3SAndreas Gohr            try {
600b3fd2d3SAndreas Gohr                if ($this->hasAvailableBuffer()) {
610b3fd2d3SAndreas Gohr                    $this->addToConsumableBuffer();
620b3fd2d3SAndreas Gohr                } elseif (!$this->hasConsumableBuffer()) {
630b3fd2d3SAndreas Gohr                    $this->addToAvailableBufferOrFail();
640b3fd2d3SAndreas Gohr                }
650b3fd2d3SAndreas Gohr            } catch (PartialMessageException $exception) {
660b3fd2d3SAndreas Gohr                $this->addToAvailableBufferOrFail();
670b3fd2d3SAndreas Gohr            }
680b3fd2d3SAndreas Gohr
690b3fd2d3SAndreas Gohr            try {
700b3fd2d3SAndreas Gohr                while ($this->hasConsumableBuffer()) {
710b3fd2d3SAndreas Gohr                    $message = $this->consume();
720b3fd2d3SAndreas Gohr                    if ($message !== null) {
730b3fd2d3SAndreas Gohr                        yield $this->constructMessage($message, $id);
740b3fd2d3SAndreas Gohr                    }
750b3fd2d3SAndreas Gohr                }
760b3fd2d3SAndreas Gohr            } catch (PartialMessageException $e) {
770b3fd2d3SAndreas Gohr                if ($this->hasAvailableBuffer()) {
780b3fd2d3SAndreas Gohr                    $this->addToConsumableBuffer();
790b3fd2d3SAndreas Gohr                } else {
800b3fd2d3SAndreas Gohr                    $this->addToAvailableBufferOrFail();
810b3fd2d3SAndreas Gohr                }
820b3fd2d3SAndreas Gohr            }
830b3fd2d3SAndreas Gohr        }
840b3fd2d3SAndreas Gohr    }
850b3fd2d3SAndreas Gohr
860b3fd2d3SAndreas Gohr    protected function addToAvailableBufferOrFail(): void
870b3fd2d3SAndreas Gohr    {
880b3fd2d3SAndreas Gohr        $bytes = $this->socket->read();
890b3fd2d3SAndreas Gohr
900b3fd2d3SAndreas Gohr        if ($bytes === false) {
910b3fd2d3SAndreas Gohr            throw new ConnectionException('The connection to the server has been lost.');
920b3fd2d3SAndreas Gohr        }
930b3fd2d3SAndreas Gohr
940b3fd2d3SAndreas Gohr        $this->buffer .= $bytes;
950b3fd2d3SAndreas Gohr    }
960b3fd2d3SAndreas Gohr
970b3fd2d3SAndreas Gohr    protected function addToConsumableBuffer(): void
980b3fd2d3SAndreas Gohr    {
990b3fd2d3SAndreas Gohr        if ($this->hasAvailableBuffer()) {
1000b3fd2d3SAndreas Gohr            $buffer = $this->unwrap((string)$this->buffer);
1010b3fd2d3SAndreas Gohr            $this->buffer = \substr((string)$this->buffer, $buffer->endsAt());
1020b3fd2d3SAndreas Gohr            $this->toConsume .= $buffer->bytes();
1030b3fd2d3SAndreas Gohr        }
1040b3fd2d3SAndreas Gohr    }
1050b3fd2d3SAndreas Gohr
1060b3fd2d3SAndreas Gohr    protected function hasBuffer(): bool
1070b3fd2d3SAndreas Gohr    {
1080b3fd2d3SAndreas Gohr        return $this->hasConsumableBuffer() || $this->hasAvailableBuffer();
1090b3fd2d3SAndreas Gohr    }
1100b3fd2d3SAndreas Gohr
1110b3fd2d3SAndreas Gohr    protected function hasAvailableBuffer(): bool
1120b3fd2d3SAndreas Gohr    {
1130b3fd2d3SAndreas Gohr        return \strlen((string)$this->buffer) !== 0;
1140b3fd2d3SAndreas Gohr    }
1150b3fd2d3SAndreas Gohr
1160b3fd2d3SAndreas Gohr    protected function hasConsumableBuffer(): bool
1170b3fd2d3SAndreas Gohr    {
118*dad993c5SAndreas Gohr        return \strlen((string)$this->toConsume) !== 0;
1190b3fd2d3SAndreas Gohr    }
1200b3fd2d3SAndreas Gohr
1210b3fd2d3SAndreas Gohr    /**
1220b3fd2d3SAndreas Gohr     * @return Message|null
1230b3fd2d3SAndreas Gohr     * @throws PartialMessageException
1240b3fd2d3SAndreas Gohr     */
1250b3fd2d3SAndreas Gohr    protected function consume(): ?Message
1260b3fd2d3SAndreas Gohr    {
1270b3fd2d3SAndreas Gohr        $message = null;
1280b3fd2d3SAndreas Gohr
1290b3fd2d3SAndreas Gohr        try {
1300b3fd2d3SAndreas Gohr            $message = $this->decode($this->toConsume);
1310b3fd2d3SAndreas Gohr            $lastPos = (int)$message->getLastPosition();
1320b3fd2d3SAndreas Gohr            $this->toConsume = \substr($this->toConsume, $lastPos);
1330b3fd2d3SAndreas Gohr
1340b3fd2d3SAndreas Gohr            if ($this->toConsume === '' && ($peek = $this->socket->read(false)) !== false) {
1350b3fd2d3SAndreas Gohr                $this->buffer .= $peek;
1360b3fd2d3SAndreas Gohr            }
1370b3fd2d3SAndreas Gohr        } catch (PartialMessageException $exception) {
1380b3fd2d3SAndreas Gohr            # If we have available buffer, it might have what we need. Attempt to add it. Otherwise let it bubble...
1390b3fd2d3SAndreas Gohr            if ($this->hasAvailableBuffer()) {
1400b3fd2d3SAndreas Gohr                $this->addToConsumableBuffer();
1410b3fd2d3SAndreas Gohr            } else {
1420b3fd2d3SAndreas Gohr                throw $exception;
1430b3fd2d3SAndreas Gohr            }
1440b3fd2d3SAndreas Gohr        }
1450b3fd2d3SAndreas Gohr
1460b3fd2d3SAndreas Gohr        # Adding to the consumed before could cause this to succeed, so retry.
1470b3fd2d3SAndreas Gohr        if ($message === null) {
1480b3fd2d3SAndreas Gohr            return $this->consume();
1490b3fd2d3SAndreas Gohr        }
1500b3fd2d3SAndreas Gohr
1510b3fd2d3SAndreas Gohr        return $message;
1520b3fd2d3SAndreas Gohr    }
1530b3fd2d3SAndreas Gohr
1540b3fd2d3SAndreas Gohr    /**
1550b3fd2d3SAndreas Gohr     * @param string $bytes
1560b3fd2d3SAndreas Gohr     * @return Buffer
1570b3fd2d3SAndreas Gohr     */
1580b3fd2d3SAndreas Gohr    protected function unwrap($bytes) : Buffer
1590b3fd2d3SAndreas Gohr    {
1600b3fd2d3SAndreas Gohr        return new Buffer($bytes, \strlen($bytes));
1610b3fd2d3SAndreas Gohr    }
1620b3fd2d3SAndreas Gohr
1630b3fd2d3SAndreas Gohr    /**
1640b3fd2d3SAndreas Gohr     * Decode the bytes to an object. If you have a partial object, throw the PartialMessageException and the queue
1650b3fd2d3SAndreas Gohr     * will attempt to append more data to the buffer.
1660b3fd2d3SAndreas Gohr     *
1670b3fd2d3SAndreas Gohr     * @param string $bytes
1680b3fd2d3SAndreas Gohr     * @return Message
1690b3fd2d3SAndreas Gohr     * @throws PartialMessageException
1700b3fd2d3SAndreas Gohr     */
1710b3fd2d3SAndreas Gohr    protected abstract function decode($bytes) : Message;
1720b3fd2d3SAndreas Gohr
1730b3fd2d3SAndreas Gohr    /**
1740b3fd2d3SAndreas Gohr     * @param int|null $id
1750b3fd2d3SAndreas Gohr     * @return mixed
1760b3fd2d3SAndreas Gohr     * @throws ConnectionException
1770b3fd2d3SAndreas Gohr     */
1780b3fd2d3SAndreas Gohr    public function getMessage(?int $id = null)
1790b3fd2d3SAndreas Gohr    {
1800b3fd2d3SAndreas Gohr        return $this->getMessages($id)->current();
1810b3fd2d3SAndreas Gohr    }
1820b3fd2d3SAndreas Gohr
1830b3fd2d3SAndreas Gohr    /**
1840b3fd2d3SAndreas Gohr     * Retrieve the message object from the message. Allow for special construction / validation if needed.
1850b3fd2d3SAndreas Gohr     *
1860b3fd2d3SAndreas Gohr     * @param Message $message
1870b3fd2d3SAndreas Gohr     * @param int|null $id
1880b3fd2d3SAndreas Gohr     * @return mixed
1890b3fd2d3SAndreas Gohr     */
1900b3fd2d3SAndreas Gohr    protected function constructMessage(Message $message, ?int $id = null)
1910b3fd2d3SAndreas Gohr    {
1920b3fd2d3SAndreas Gohr        return $message->getMessage();
1930b3fd2d3SAndreas Gohr    }
1940b3fd2d3SAndreas Gohr}
195