1<?php
2/**
3 * This file is part of the FreeDSx Socket package.
4 *
5 * (c) Chad Sikorra <Chad.Sikorra@gmail.com>
6 *
7 * For the full copyright and license information, please view the LICENSE
8 * file that was distributed with this source code.
9 */
10
11namespace FreeDSx\Socket\Queue;
12
13use FreeDSx\Socket\Exception\ConnectionException;
14use FreeDSx\Socket\Exception\PartialMessageException;
15use FreeDSx\Socket\Socket;
16
17/**
18 * Used to retrieve Messages/PDUs from a socket.
19 *
20 * @author Chad Sikorra <Chad.Sikorra@gmail.com>
21 */
22abstract class MessageQueue
23{
24    /**
25     * @var Socket
26     */
27    protected $socket;
28
29    /**
30     * @var false|string
31     */
32    protected $buffer = false;
33
34    /**
35     * @var string|null
36     */
37    protected $toConsume = null;
38
39    /**
40     * @param Socket $socket
41     */
42    public function __construct(Socket $socket)
43    {
44        $this->socket = $socket;
45    }
46
47    /**
48     * @param int|null $id
49     * @return \Generator
50     * @throws ConnectionException
51     */
52    public function getMessages(?int $id = null)
53    {
54        if (!$this->hasBuffer()) {
55            $this->addToAvailableBufferOrFail();
56        }
57
58        while ($this->hasBuffer()) {
59            try {
60                if ($this->hasAvailableBuffer()) {
61                    $this->addToConsumableBuffer();
62                } elseif (!$this->hasConsumableBuffer()) {
63                    $this->addToAvailableBufferOrFail();
64                }
65            } catch (PartialMessageException $exception) {
66                $this->addToAvailableBufferOrFail();
67            }
68
69            try {
70                while ($this->hasConsumableBuffer()) {
71                    $message = $this->consume();
72                    if ($message !== null) {
73                        yield $this->constructMessage($message, $id);
74                    }
75                }
76            } catch (PartialMessageException $e) {
77                if ($this->hasAvailableBuffer()) {
78                    $this->addToConsumableBuffer();
79                } else {
80                    $this->addToAvailableBufferOrFail();
81                }
82            }
83        }
84    }
85
86    protected function addToAvailableBufferOrFail(): void
87    {
88        $bytes = $this->socket->read();
89
90        if ($bytes === false) {
91            throw new ConnectionException('The connection to the server has been lost.');
92        }
93
94        $this->buffer .= $bytes;
95    }
96
97    protected function addToConsumableBuffer(): void
98    {
99        if ($this->hasAvailableBuffer()) {
100            $buffer = $this->unwrap((string)$this->buffer);
101            $this->buffer = \substr((string)$this->buffer, $buffer->endsAt());
102            $this->toConsume .= $buffer->bytes();
103        }
104    }
105
106    protected function hasBuffer(): bool
107    {
108        return $this->hasConsumableBuffer() || $this->hasAvailableBuffer();
109    }
110
111    protected function hasAvailableBuffer(): bool
112    {
113        return \strlen((string)$this->buffer) !== 0;
114    }
115
116    protected function hasConsumableBuffer(): bool
117    {
118        return \strlen((string)$this->toConsume) !== 0;
119    }
120
121    /**
122     * @return Message|null
123     * @throws PartialMessageException
124     */
125    protected function consume(): ?Message
126    {
127        $message = null;
128
129        try {
130            $message = $this->decode($this->toConsume);
131            $lastPos = (int)$message->getLastPosition();
132            $this->toConsume = \substr($this->toConsume, $lastPos);
133
134            if ($this->toConsume === '' && ($peek = $this->socket->read(false)) !== false) {
135                $this->buffer .= $peek;
136            }
137        } catch (PartialMessageException $exception) {
138            # If we have available buffer, it might have what we need. Attempt to add it. Otherwise let it bubble...
139            if ($this->hasAvailableBuffer()) {
140                $this->addToConsumableBuffer();
141            } else {
142                throw $exception;
143            }
144        }
145
146        # Adding to the consumed before could cause this to succeed, so retry.
147        if ($message === null) {
148            return $this->consume();
149        }
150
151        return $message;
152    }
153
154    /**
155     * @param string $bytes
156     * @return Buffer
157     */
158    protected function unwrap($bytes) : Buffer
159    {
160        return new Buffer($bytes, \strlen($bytes));
161    }
162
163    /**
164     * Decode the bytes to an object. If you have a partial object, throw the PartialMessageException and the queue
165     * will attempt to append more data to the buffer.
166     *
167     * @param string $bytes
168     * @return Message
169     * @throws PartialMessageException
170     */
171    protected abstract function decode($bytes) : Message;
172
173    /**
174     * @param int|null $id
175     * @return mixed
176     * @throws ConnectionException
177     */
178    public function getMessage(?int $id = null)
179    {
180        return $this->getMessages($id)->current();
181    }
182
183    /**
184     * Retrieve the message object from the message. Allow for special construction / validation if needed.
185     *
186     * @param Message $message
187     * @param int|null $id
188     * @return mixed
189     */
190    protected function constructMessage(Message $message, ?int $id = null)
191    {
192        return $message->getMessage();
193    }
194}
195