examples | ||
src | ||
.gitignore | ||
composer.json | ||
composer.lock | ||
README.md |
🚽 Toalett
Welcome to Toalett, a humble initiative. Toalett is the Norwegian word for toilet 💩.
What is toalett/react-stream-adapter
?
It is a library that allows any datasource to be used as a stream with ReactPHP. It is very small - there
is one interface, one class
and one trait. Its only dependency is react/stream
.
The StreamAdapter
takes an implementation of the Source
interface and
makes it approachable as a stream in applications using
an event loop.
Installation
It is available on Packagist:
composer require toalett/react-stream-adapter
Motivation
I was working on a project that required an application to respond to AMQP messages in a non-blocking way. The application made use of an event loop. Initially I used a periodic timer with a callback, but as the application grew this became a cluttered mess. It slowly started to feel more natural to treat the message queue as a stream. This makes sense if you think about it:
In computer science, a stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.
This definition suits a message queue.
In the project I mentioned earlier, I use this library to poll an AMQP queue every 10 seconds. This keeps my load low and allows me to do other things in the meantime. This abstraction turned out really useful, so I thought that others might enjoy it too.
How do I use this?
There are only three components to worry about, and one of them is optional! The main components are
the Source
interface and the StreamAdapter
class. The optional component
is the EndlessTrait
, which can be used in an endless source.
The steps you need to take to use this library are as follows:
- Define a class that is able to generate or provide some data. It must implement the
Source
interface. - The
select()
method is called periodically. This is where you return your next piece of data. Make sure theselect()
method returns anything that is notnull
when data is available (anything goes) ornull
when there is none. You may add a typehint to your implementation ofselect()
such asselect(): ?string
orselect(): ?MyData
for improved clarity. - The interface also specifies the
close(): void
andeof(): bool
methods. In an endless (infinite) stream,close()
may be left empty andeof()
should return false (EOF is never reached). TheEndlessTrait
provides these implementations. - Use the
StreamAdapter
to attach yourSource
to the loop. - Interact with the adapter as if it were any other
ReadableInputStream
.
Note: This library uses polling under the hood. The default polling interval is 0.5 seconds, though if checking for
data is an intensive operation, you might want to increase the interval a bit to prevent slowdowns. This is a tradeoff
between responsiveness and load. Custom intervals can be set by passing them as a third argument to
the StreamAdapter
constructor.
Note: The StreamAdapter
reads data eagerly from the source - it won't stop reading until
there is nothing left to read. This prevents congestion when high polling intervals are used but it might block
execution for a while when there is a lot of data to be read or if your select()
routine takes some time.
$loop = Factory::create();
$source = new MySource(); // implements Source
$stream = new StreamAdapter($source, $loop);
$stream->on('data', function(MyData $data) {
/* do something with data */
});
$loop->run();
Check out the examples folder for some simple implementations.
Questions
Q: Where is the code that deals with AMQP messages?
A: It will be released in a separate package, but it needs some work before it can be published.
Q: Where are the tests?
A: There is only one class, and it is mostly based on the ReadableResourceStream
from react/stream
. Tests might
be added later, but as of now, I don't really see the value. Feel free to create an issue if this bothers you!