diff --git a/.gitignore b/.gitignore index f7f8ac3..5049a9c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /.idea/ /vendor/ +/tags diff --git a/README.md b/README.md index c9273ed..6dc1d61 100644 --- a/README.md +++ b/README.md @@ -1,61 +1,92 @@ -# ```reactphp-input-stream``` -## What is this? -A compact library that can wrap a non-blocking input to behave like a [stream](https://reactphp.org/stream/) in a [ReactPHP EventLoop](https://reactphp.org/event-loop/). +# 🚽 Toalett + +Welcome to Toalett, a humble initiative. Toalett is the Norwegian word for toilet 💩. + +## What is `toalett/react-stream-adapter`? + +It is a library that allows any datasource to be used as a stream with ReactPHP. It is very small - there +is [one interface](src/Source.php), [one class](src/StreamAdapter.php) +and [one trait](src/EndlessTrait.php). Its only dependency is `react/stream`. + +The [`StreamAdapter`](src/StreamAdapter.php) takes an implementation of the [`Source`](src/Source.php) interface and +makes it approachable as a [stream](https://reactphp.org/stream/) in applications using +an [event loop](https://reactphp.org/event-loop/). + +## Installation + +It is available on [Packagist](https://packagist.org/packages/toalett/): + +```bash +composer require toalett/react-stream-adapter +``` + +## Motivation + +I was working on a project that required an application to respond to AMQP messages in a non-blocking way. The +application made use of an event loop. Initially I used a periodic timer with a callback, but as the application grew +this became a cluttered mess. It slowly started to feel more natural to treat the message queue as a stream. This makes +sense if you think about it: + +> In computer science, a stream is a sequence of data elements made available over time. A stream can be thought of as items +> on a conveyor belt being processed one at a time rather than in large batches. +> +> — [Stream (computing) on Wikipedia](https://en.wikipedia.org/wiki/Stream_(computing)) + +This definition suits a message queue. + +In the project I mentioned earlier, I use this library to poll an AMQP queue every 10 seconds. This keeps my load low +and allows me to do other things in the meantime. This abstraction turned out really useful, so I thought that others +might enjoy it too. ## How do I use this? -Check out the `examples` folder. It contains a very basic example of a non-blocking input implementation, as well -as a very (_very_) lame joke generator. -## Why is this useful? -I needed to respond to incoming AMQP messages in my event loop and I was feeling adventurous. -Admittedly, I could have just used a periodic timer directly (that is what this library uses under the hood), but where's the fun in that? - -I hear you ask: "_Where's the code that deals with the AMQP messages_?" -While I was writing this, I felt like it would be useful to -separate the logic of dealing with input in a stream-like manner from the logic that -deals with AMQP messages. -This means you can reuse this library to map practically anything to a readable stream. -An extra-lame example of this can be found in `examples/jokes_example.php`. - -When the code for AMQP consumption as stream is finished, I will link it here. +There are only three components to worry about, and one of them is optional! The main components are +the [`Source`](src/Source.php) interface and the [`StreamAdapter`](src/StreamAdapter.php) class. The optional component +is the [`EndlessTrait`](src/EndlessTrait.php), which can be used in an endless source. -## Where are the unit tests? -_Errrrrr..._ +The steps you need to take to use this library are as follows: -## How do I install this? -This should do it [once it's available on Packagist](https://packagist.org/packages/joopschilder/): -```bash -composer require joopschilder/reactphp-input-stream -``` +1. Define a class that is able to generate or provide some data. It must implement the [`Source`](src/Source.php) + interface. +2. The `select()` method is called periodically. This is where you return your next piece of data. Make sure + the `select()` method returns anything that is not `null` when data is available (anything goes) or `null` + when there is none. You may add a typehint to your implementation of `select()` such as `select(): ?string` + or `select(): ?MyData` for improved clarity. +3. The interface also specifies the `close(): void` and `eof(): bool` methods. In an endless (infinite) + stream, `close()` may be left empty and `eof()` should return false (EOF is never reached). + The [`EndlessTrait`](src/EndlessTrait.php) + provides these implementations. +4. Use the [`StreamAdapter`](src/StreamAdapter.php) to attach your [`Source`](src/Source.php) to the loop. +5. Interact with the adapter as if it were any other `ReadableInputStream`. + +_Note:_ This library uses polling under the hood. The default polling interval is 0.5 seconds, though if checking for +data is an intensive operation, you might want to increase the interval a bit to prevent slowdowns. This is a tradeoff +between responsiveness and load. Custom intervals can be set by passing them as a third argument to +the [`StreamAdapter`](src/StreamAdapter.php) constructor. + +_Note:_ The [`StreamAdapter`](src/StreamAdapter.php) reads data eagerly from the source - it won't stop reading until +there is nothing left to read. This prevents congestion when high polling intervals are used but it might block +execution for a while when there is a lot of data to be read or if your `select()` routine takes some time. -## Mandatory block of example code ```php -// Say, we have an event loop... $loop = Factory::create(); -// And say, we have a non-blocking input called $input... -$input = new DemoNonBlockingInput(); +$source = new MySource(); // implements Source +$stream = new StreamAdapter($source, $loop); +$stream->on('data', function(MyData $data) { + /* do something with data */ +}); -// Then we can create a ReadableStream from it like so: -$stream = new ReadableNonBlockingInputStream($input, $loop); - -// If your 'select()' method takes a long time to execute, or you just don't -// feel like polling the input availability that often, you can -// set a custom polling interval by supplying an instance of PollingInterval -// as the third constructor parameter: -$lowPollingStream = new ReadableNonBlockingInputStream($input, $loop, new PollingInterval(5)); - -// Of course, the stream emits all expected events (except end) -$stream->on('data', fn() => print('m')); -$stream->on('error', fn() => print('e')); -$stream->on('close', fn() => print('c')); - -// If you know what data your input returns, you may type-hint it: -$stream->on('data', fn(Joke $joke) => print($joke . PHP_EOL)); - -// Add a periodic timer for demonstration purposes -$loop->addPeriodicTimer(0.2, fn() => print('.')); - -// And kick 'er off. $loop->run(); ``` + +Check out the [examples](examples) folder for some simple implementations. + +## Questions + +__Q__: _Where is the code that deals with AMQP messages_? +__A__: It will be released in a separate package, but it needs some work before it can be published. + +__Q__: _Where are the tests_? +__A__: There is only one class, and it is mostly based on the `ReadableResourceStream` from `react/stream`. Tests might +be added later, but as of now, I don't really see the value. Feel free to create an issue if this bothers you! diff --git a/composer.json b/composer.json index 8c941b2..1e9c556 100644 --- a/composer.json +++ b/composer.json @@ -1,22 +1,36 @@ { - "name": "joopschilder/reactphp-input-stream", - "description": "Wraps a non-blocking input to behave like a stream so it can be used in a ReactPHP EventLoop", + "name": "toalett/react-stream-adapter", "type": "library", + "description": "Use any data source as a stream with ReactPHP", + "keywords": [ + "react", + "reactphp", + "event-driven", + "non-blocking", + "readable", + "eventloop", + "io", + "nio", + "stream", + "source", + "adapter" + ], + "homepage": "https://github.com/toalett-io/react-stream-adapter", "license": "MIT", "authors": [ { "name": "Joop Schilder", - "email": "jnmschilder@protonmail.com" + "email": "jnmschilder@protonmail.com", + "homepage": "https://joopschilder.nl/" } ], - "require": { - "php": "^7.4", - "react/event-loop": "^1.1", - "react/stream": "^1.1" - }, "autoload": { "psr-4": { - "JoopSchilder\\React\\Stream\\NonBlockingInput\\": "src" + "Toalett\\React\\Stream\\": "src" } + }, + "require": { + "php": "^7.4", + "react/stream": "^1.0" } } diff --git a/composer.lock b/composer.lock index 1b653fc..3ce3a93 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "eb4b86d14372d5f65b57683c22bd760f", + "content-hash": "4c4c46859db5407b7a4dc036efcca228", "packages": [ { "name": "evenement/evenement", @@ -47,6 +47,10 @@ "event-dispatcher", "event-emitter" ], + "support": { + "issues": "https://github.com/igorw/evenement/issues", + "source": "https://github.com/igorw/evenement/tree/master" + }, "time": "2017-07-23T21:35:13+00:00" }, { @@ -89,20 +93,24 @@ "asynchronous", "event-loop" ], + "support": { + "issues": "https://github.com/reactphp/event-loop/issues", + "source": "https://github.com/reactphp/event-loop/tree/v1.1.1" + }, "time": "2020-01-01T18:39:52+00:00" }, { "name": "react/stream", - "version": "v1.1.0", + "version": "v1.1.1", "source": { "type": "git", "url": "https://github.com/reactphp/stream.git", - "reference": "50426855f7a77ddf43b9266c22320df5bf6c6ce6" + "reference": "7c02b510ee3f582c810aeccd3a197b9c2f52ff1a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/reactphp/stream/zipball/50426855f7a77ddf43b9266c22320df5bf6c6ce6", - "reference": "50426855f7a77ddf43b9266c22320df5bf6c6ce6", + "url": "https://api.github.com/repos/reactphp/stream/zipball/7c02b510ee3f582c810aeccd3a197b9c2f52ff1a", + "reference": "7c02b510ee3f582c810aeccd3a197b9c2f52ff1a", "shasum": "" }, "require": { @@ -112,7 +120,7 @@ }, "require-dev": { "clue/stream-filter": "~1.2", - "phpunit/phpunit": "^6.4 || ^5.7 || ^4.8.35" + "phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35" }, "type": "library", "autoload": { @@ -135,7 +143,11 @@ "stream", "writable" ], - "time": "2019-01-01T16:15:09+00:00" + "support": { + "issues": "https://github.com/reactphp/stream/issues", + "source": "https://github.com/reactphp/stream/tree/v1.1.1" + }, + "time": "2020-05-04T10:17:57+00:00" } ], "packages-dev": [], @@ -144,6 +156,9 @@ "stability-flags": [], "prefer-stable": false, "prefer-lowest": false, - "platform": [], - "platform-dev": [] + "platform": { + "php": "^7.4" + }, + "platform-dev": [], + "plugin-api-version": "2.0.0" } diff --git a/examples/basic_example.php b/examples/basic_example.php deleted file mode 100644 index e819e13..0000000 --- a/examples/basic_example.php +++ /dev/null @@ -1,17 +0,0 @@ -on('data', fn() => print('m')); -$stream->on('error', fn() => print('e')); -$stream->on('close', fn() => print('c')); -$loop->addPeriodicTimer(0.2, fn() => print('.')); -$loop->run(); diff --git a/examples/classes.php b/examples/classes.php new file mode 100644 index 0000000..46a3cf6 --- /dev/null +++ b/examples/classes.php @@ -0,0 +1,99 @@ +openedAt = microtime(true); + } + + public function select(): ?float + { + $now = microtime(true); + if ($now - $this->openedAt > self::SECONDS_BEFORE_EMITTING_ERROR) { + throw new RuntimeException('An error has occured!'); + } + if ($now - $this->lastEmission > self::EMISSION_INTERVAL) { + $this->lastEmission = $now; + return $now; + } + + return null; + } +} + +final class Joker implements Source +{ + use EndlessTrait; + + private float $deadline = 0; + private array $jokes = [ + 'What did the Buddhist ask the hot dog vendor? - Make me one with everything.', + 'You know why you never see elephants hiding up in trees? - Because they’re really good at it.', + 'What is red and smells like blue paint? - Red paint.', + 'A dyslexic man walks into a bra.', + 'Where does the General keep his armies? - In his sleevies!', + 'What do you call bears with no ears? - B', + 'Why dont blind people skydive? - Because it scares the crap out of their dogs.', + ]; + + public function open(): void + { + $this->scheduleNextJoke(); + } + + public function select(): ?string + { + if (microtime(true) < $this->deadline) { + return null; + } + + $this->scheduleNextJoke(); + return $this->jokes[array_rand($this->jokes)]; + } + + private function scheduleNextJoke(): void + { + $delay = mt_rand() / mt_getrandmax(); // somewhere between 0 - 1 + $this->deadline = microtime(true) + (5.0 * $delay); + } +} + +final class ReachesEofIn5Iterations implements Source +{ + private array $buffer = [ + 'line 1', + 'line 2', + 'line 3', + 'line 4', + 'line 5', + ]; + + public function open(): void + { + } + + public function select(): ?string + { + return array_shift($this->buffer); + } + + public function close(): void + { + } + + public function eof(): bool + { + return count($this->buffer) === 0; + } +} diff --git a/examples/demo_classes.php b/examples/demo_classes.php deleted file mode 100644 index acbcc7b..0000000 --- a/examples/demo_classes.php +++ /dev/null @@ -1,124 +0,0 @@ -initialEmission = microtime(true); - } - - - public function select(): ?object - { - $now = microtime(true); - if ($now - $this->initialEmission > self::ERROR_AFTER_S) { - throw new Exception('Oh no!'); - } - if ($now - $this->lastEmission > self::DATA_AVAILABLE_INTERVAL_S) { - $this->lastEmission = $now; - - return new stdClass(); - } - - return null; - } - - - public function close(): void - { - } - -} - -/** - * Class Joke - * Used as a payload for the DemoJokeGenerator. - * @see DemoJokeGenerator - */ -final class Joke -{ - private string $joke; - - - public function __construct(string $joke) - { - $this->joke = $joke; - } - - - public function __toString() - { - return $this->joke; - } - -} - -/** - * Class DemoJokeGenerator - * Generates a random joke at a random interval (0 - 2 seconds). - */ -final class DemoJokeGenerator implements NonBlockingInputInterface -{ - private float $lastEmission = 0; - - private float $deadline = 0; - - private array $jokes = [ - 'What did the Buddhist ask the hot dog vendor? - Make me one with everything.', - 'You know why you never see elephants hiding up in trees? - Because they’re really good at it.', - 'What is red and smells like blue paint? - Red paint.', - 'A dyslexic man walks into a bra.', - 'Where does the General keep his armies? - In his sleevies!', - 'What do you call bears with no ears? - B', - 'Why dont blind people skydive? - Because it scares the crap out of their dogs.', - ]; - - - private function scheduleNextJoke() - { - $this->lastEmission = microtime(true); - $delay = mt_rand() / mt_getrandmax(); - $this->deadline = $this->lastEmission + (2.0 * $delay); - } - - - public function open(): void - { - $this->scheduleNextJoke(); - } - - - public function select(): ?object - { - $now = microtime(true); - if ($now > $this->deadline) { - $this->scheduleNextJoke(); - - return new Joke($this->jokes[array_rand($this->jokes)]); - } - - return null; - } - - - public function close(): void - { - } - -} diff --git a/examples/eof_in_5_iterations.php b/examples/eof_in_5_iterations.php new file mode 100644 index 0000000..ec69f0d --- /dev/null +++ b/examples/eof_in_5_iterations.php @@ -0,0 +1,29 @@ +on('data', fn(string $s) => printf('Data received: %s.%s', $s, PHP_EOL)); +$stream->on('end', fn() => print('Stream reached eof.' . PHP_EOL)); +$stream->on('close', fn() => print('Stream closed.' . PHP_EOL)); + +print(<<run(); diff --git a/examples/exception_after_4_seconds.php b/examples/exception_after_4_seconds.php new file mode 100644 index 0000000..39f8e5c --- /dev/null +++ b/examples/exception_after_4_seconds.php @@ -0,0 +1,25 @@ +addPeriodicTimer(0.2, fn() => print('.')); + +$stream = new StreamAdapter(new WillThrowExceptionAfter4Seconds(), $loop); +$stream->on('data', fn() => print('Data received.' . PHP_EOL)); +$stream->on('error', fn(Throwable $t) => print('Error: ' . $t->getMessage() . PHP_EOL)); +$stream->on('close', fn() => print('Stream closed.' . PHP_EOL)); + +print(<<run(); diff --git a/examples/infinite_stream_of_jokes.php b/examples/infinite_stream_of_jokes.php new file mode 100644 index 0000000..be6899a --- /dev/null +++ b/examples/infinite_stream_of_jokes.php @@ -0,0 +1,23 @@ +on('data', fn(string $joke) => print($joke . PHP_EOL)); + +print(<<run(); diff --git a/examples/jokes_example.php b/examples/jokes_example.php deleted file mode 100644 index a53d51d..0000000 --- a/examples/jokes_example.php +++ /dev/null @@ -1,16 +0,0 @@ -on('data', fn(Joke $joke) => print($joke . PHP_EOL)); -$loop->addPeriodicTimer(0.2, fn() => print('.' . PHP_EOL)); -$loop->run(); - diff --git a/src/EndlessTrait.php b/src/EndlessTrait.php new file mode 100644 index 0000000..903fc72 --- /dev/null +++ b/src/EndlessTrait.php @@ -0,0 +1,15 @@ +input = $input; - $this->loop = $loop; - $this->interval = $interval ?? new PollingInterval(); - - $this->resume(); - } - - - public function isReadable() - { - return !$this->isClosed; - } - - - /** - * @see ReadableResourceStream::pause() - * Pause consumption of the AMQP queue but do not mark the stream as closed - */ - public function pause() - { - if (!$this->isListening) { - return; - } - - $this->input->close(); - $this->loop->cancelTimer($this->periodicTimer); - $this->isListening = false; - } - - - /** - * @see ReadableResourceStream::resume() - * Register the consumer with the broker and add consumer again - */ - public function resume() - { - if ($this->isListening || $this->isClosed) { - return; - } - - $this->input->open(); - $this->periodicTimer = $this->loop->addPeriodicTimer( - $this->interval->getInterval(), - function () { - if ($data = $this->read()) { - $this->emit('data', [$data]); - } - } - ); - - $this->isListening = true; - } - - - /** - * @param WritableStreamInterface $dest - * @param array $options - * @return WritableStreamInterface - * @see ReadableResourceStream::pipe() - */ - public function pipe(WritableStreamInterface $dest, array $options = []) - { - return Util::pipe($this, $dest, $options); - } - - - /** - * @see ReadableResourceStream::close() - */ - public function close() - { - if ($this->isClosed) { - return; - } - - $this->isClosed = true; - - $this->emit('close'); - $this->pause(); - $this->removeAllListeners(); - - $this->input->close(); - } - - - private function read(): ?object - { - try { - return $this->input->select(); - } catch (Throwable $t) { - $this->emit('error', [ - new RuntimeException('Unable to read data from input: ' . $t->getMessage(), 0, $t), - ]); - $this->close(); - } - - return null; - } - -} diff --git a/src/Source.php b/src/Source.php new file mode 100644 index 0000000..1e49b0e --- /dev/null +++ b/src/Source.php @@ -0,0 +1,17 @@ +source = $source; + $this->loop = $loop; + $this->pollingInterval = $pollingInterval; + + $this->resume(); + } + + public function isReadable(): bool + { + return !$this->closed; + } + + public function pause(): void + { + if (!$this->listening) { + return; + } + + $this->source->close(); + $this->loop->cancelTimer($this->timer); + $this->listening = false; + } + + public function resume(): void + { + if ($this->listening || $this->closed) { + return; + } + + $this->source->open(); + $this->timer = $this->loop->addPeriodicTimer($this->pollingInterval, function () { + while (!is_null($data = $this->read())) { + $this->emit('data', [$data]); + } + if ($this->source->eof()) { + $this->emit('end'); + $this->close(); + } + }); + $this->listening = true; + } + + public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface + { + return Util::pipe($this, $dest, $options); + } + + public function close(): void + { + if ($this->closed) { + return; + } + + $this->closed = true; + + $this->emit('close'); + $this->pause(); + $this->removeAllListeners(); + + $this->source->close(); + } + + /** + * @return mixed|null + */ + private function read() + { + try { + return $this->source->select(); + } catch (Throwable $t) { + $this->emit('error', [new RuntimeException('Unable to read data from source: ' . $t->getMessage(), 0, $t)]); + $this->close(); + } + + return null; + } +} diff --git a/src/ValueObject/PollingInterval.php b/src/ValueObject/PollingInterval.php deleted file mode 100644 index 7c41eef..0000000 --- a/src/ValueObject/PollingInterval.php +++ /dev/null @@ -1,29 +0,0 @@ -interval = $interval; - } - - - public function getInterval(): float - { - return $this->interval; - } - -}