From 4edc6132c4281418289aebe85da3e3ef1ef07b6c Mon Sep 17 00:00:00 2001 From: Joop Schilder Date: Sat, 8 Feb 2020 22:09:11 +0100 Subject: [PATCH] Remove PayloadInterface, add README --- README.md | 61 ++++++++++++ composer.json | 37 ++++---- composer.lock | 2 +- examples/basic_example.php | 7 +- examples/basic_example_classes.php | 46 --------- examples/demo_classes.php | 124 +++++++++++++++++++++++++ examples/jokes_example.php | 16 ++++ src/NonBlockingInputInterface.php | 4 +- src/PayloadInterface.php | 10 -- src/ReadableNonBlockingInputStream.php | 20 ++-- 10 files changed, 234 insertions(+), 93 deletions(-) create mode 100644 README.md delete mode 100644 examples/basic_example_classes.php create mode 100644 examples/demo_classes.php create mode 100644 examples/jokes_example.php delete mode 100644 src/PayloadInterface.php diff --git a/README.md b/README.md new file mode 100644 index 0000000..c9273ed --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +# ```reactphp-input-stream``` +## What is this? +A compact library that can wrap a non-blocking input to behave like a [stream](https://reactphp.org/stream/) in a [ReactPHP EventLoop](https://reactphp.org/event-loop/). + +## How do I use this? +Check out the `examples` folder. It contains a very basic example of a non-blocking input implementation, as well +as a very (_very_) lame joke generator. + +## Why is this useful? +I needed to respond to incoming AMQP messages in my event loop and I was feeling adventurous. +Admittedly, I could have just used a periodic timer directly (that is what this library uses under the hood), but where's the fun in that? + +I hear you ask: "_Where's the code that deals with the AMQP messages_?" +While I was writing this, I felt like it would be useful to +separate the logic of dealing with input in a stream-like manner from the logic that +deals with AMQP messages. +This means you can reuse this library to map practically anything to a readable stream. +An extra-lame example of this can be found in `examples/jokes_example.php`. + +When the code for AMQP consumption as stream is finished, I will link it here. + +## Where are the unit tests? +_Errrrrr..._ + +## How do I install this? +This should do it [once it's available on Packagist](https://packagist.org/packages/joopschilder/): +```bash +composer require joopschilder/reactphp-input-stream +``` + +## Mandatory block of example code +```php +// Say, we have an event loop... +$loop = Factory::create(); + +// And say, we have a non-blocking input called $input... +$input = new DemoNonBlockingInput(); + +// Then we can create a ReadableStream from it like so: +$stream = new ReadableNonBlockingInputStream($input, $loop); + +// If your 'select()' method takes a long time to execute, or you just don't +// feel like polling the input availability that often, you can +// set a custom polling interval by supplying an instance of PollingInterval +// as the third constructor parameter: +$lowPollingStream = new ReadableNonBlockingInputStream($input, $loop, new PollingInterval(5)); + +// Of course, the stream emits all expected events (except end) +$stream->on('data', fn() => print('m')); +$stream->on('error', fn() => print('e')); +$stream->on('close', fn() => print('c')); + +// If you know what data your input returns, you may type-hint it: +$stream->on('data', fn(Joke $joke) => print($joke . PHP_EOL)); + +// Add a periodic timer for demonstration purposes +$loop->addPeriodicTimer(0.2, fn() => print('.')); + +// And kick 'er off. +$loop->run(); +``` diff --git a/composer.json b/composer.json index 24ea2a7..8c941b2 100644 --- a/composer.json +++ b/composer.json @@ -1,21 +1,22 @@ { - "name": "joopschilder/reactphp-input-stream", - "description": "Wraps a non-blocking input to behave like a stream so it can be used in a ReactPHP EventLoop", - "type": "library", - "license": "MIT", - "authors": [ - { - "name": "Joop Schilder", - "email": "jnmschilder@protonmail.com" - } - ], - "require": { - "react/event-loop": "^1.1", - "react/stream": "^1.1" - }, - "autoload": { - "psr-4": { - "JoopSchilder\\React\\Stream\\NonBlockingInput\\": "src" - } + "name": "joopschilder/reactphp-input-stream", + "description": "Wraps a non-blocking input to behave like a stream so it can be used in a ReactPHP EventLoop", + "type": "library", + "license": "MIT", + "authors": [ + { + "name": "Joop Schilder", + "email": "jnmschilder@protonmail.com" } + ], + "require": { + "php": "^7.4", + "react/event-loop": "^1.1", + "react/stream": "^1.1" + }, + "autoload": { + "psr-4": { + "JoopSchilder\\React\\Stream\\NonBlockingInput\\": "src" + } + } } diff --git a/composer.lock b/composer.lock index 142ed77..1b653fc 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "8cb0549e866a693d6b61e7e7f9a75862", + "content-hash": "eb4b86d14372d5f65b57683c22bd760f", "packages": [ { "name": "evenement/evenement", diff --git a/examples/basic_example.php b/examples/basic_example.php index 44a848a..e819e13 100644 --- a/examples/basic_example.php +++ b/examples/basic_example.php @@ -4,12 +4,8 @@ use JoopSchilder\React\Stream\NonBlockingInput\ReadableNonBlockingInputStream; use React\EventLoop\Factory; require_once __DIR__ . '/../vendor/autoload.php'; -require_once __DIR__ . '/basic_example_classes.php'; +require_once __DIR__ . '/demo_classes.php'; -/** - * The input will have data available every second. - * After 4 seconds have passed, it will generate an error. - */ $input = new DemoNonBlockingInput(); $loop = Factory::create(); @@ -19,4 +15,3 @@ $stream->on('error', fn() => print('e')); $stream->on('close', fn() => print('c')); $loop->addPeriodicTimer(0.2, fn() => print('.')); $loop->run(); - diff --git a/examples/basic_example_classes.php b/examples/basic_example_classes.php deleted file mode 100644 index 61b0daa..0000000 --- a/examples/basic_example_classes.php +++ /dev/null @@ -1,46 +0,0 @@ -initialEmission = microtime(true); - } - - - public function select(): ?PayloadInterface - { - $now = microtime(true); - if ($now - $this->initialEmission > self::ERROR_AFTER_S) { - throw new Exception('Oh no!'); - } - if ($now - $this->lastEmission > self::DATA_AVAILABLE_INTERVAL_S) { - $this->lastEmission = $now; - - return new DemoEmptyPayload(); - } - - return null; - } - - - public function close(): void - { - } - -} diff --git a/examples/demo_classes.php b/examples/demo_classes.php new file mode 100644 index 0000000..acbcc7b --- /dev/null +++ b/examples/demo_classes.php @@ -0,0 +1,124 @@ +initialEmission = microtime(true); + } + + + public function select(): ?object + { + $now = microtime(true); + if ($now - $this->initialEmission > self::ERROR_AFTER_S) { + throw new Exception('Oh no!'); + } + if ($now - $this->lastEmission > self::DATA_AVAILABLE_INTERVAL_S) { + $this->lastEmission = $now; + + return new stdClass(); + } + + return null; + } + + + public function close(): void + { + } + +} + +/** + * Class Joke + * Used as a payload for the DemoJokeGenerator. + * @see DemoJokeGenerator + */ +final class Joke +{ + private string $joke; + + + public function __construct(string $joke) + { + $this->joke = $joke; + } + + + public function __toString() + { + return $this->joke; + } + +} + +/** + * Class DemoJokeGenerator + * Generates a random joke at a random interval (0 - 2 seconds). + */ +final class DemoJokeGenerator implements NonBlockingInputInterface +{ + private float $lastEmission = 0; + + private float $deadline = 0; + + private array $jokes = [ + 'What did the Buddhist ask the hot dog vendor? - Make me one with everything.', + 'You know why you never see elephants hiding up in trees? - Because they’re really good at it.', + 'What is red and smells like blue paint? - Red paint.', + 'A dyslexic man walks into a bra.', + 'Where does the General keep his armies? - In his sleevies!', + 'What do you call bears with no ears? - B', + 'Why dont blind people skydive? - Because it scares the crap out of their dogs.', + ]; + + + private function scheduleNextJoke() + { + $this->lastEmission = microtime(true); + $delay = mt_rand() / mt_getrandmax(); + $this->deadline = $this->lastEmission + (2.0 * $delay); + } + + + public function open(): void + { + $this->scheduleNextJoke(); + } + + + public function select(): ?object + { + $now = microtime(true); + if ($now > $this->deadline) { + $this->scheduleNextJoke(); + + return new Joke($this->jokes[array_rand($this->jokes)]); + } + + return null; + } + + + public function close(): void + { + } + +} diff --git a/examples/jokes_example.php b/examples/jokes_example.php new file mode 100644 index 0000000..a53d51d --- /dev/null +++ b/examples/jokes_example.php @@ -0,0 +1,16 @@ +on('data', fn(Joke $joke) => print($joke . PHP_EOL)); +$loop->addPeriodicTimer(0.2, fn() => print('.' . PHP_EOL)); +$loop->run(); + diff --git a/src/NonBlockingInputInterface.php b/src/NonBlockingInputInterface.php index 0b4d8be..6e65b83 100644 --- a/src/NonBlockingInputInterface.php +++ b/src/NonBlockingInputInterface.php @@ -8,8 +8,8 @@ interface NonBlockingInputInterface function open(): void; - function select(): ?PayloadInterface; - + function select(): ?object; + function close(): void; diff --git a/src/PayloadInterface.php b/src/PayloadInterface.php deleted file mode 100644 index 36958c0..0000000 --- a/src/PayloadInterface.php +++ /dev/null @@ -1,10 +0,0 @@ -closed; + return !$this->isClosed; } @@ -58,13 +58,13 @@ final class ReadableNonBlockingInputStream extends EventEmitter implements Reada */ public function pause() { - if (!$this->listening) { + if (!$this->isListening) { return; } $this->input->close(); $this->loop->cancelTimer($this->periodicTimer); - $this->listening = false; + $this->isListening = false; } @@ -74,7 +74,7 @@ final class ReadableNonBlockingInputStream extends EventEmitter implements Reada */ public function resume() { - if ($this->listening || $this->closed) { + if ($this->isListening || $this->isClosed) { return; } @@ -88,7 +88,7 @@ final class ReadableNonBlockingInputStream extends EventEmitter implements Reada } ); - $this->listening = true; + $this->isListening = true; } @@ -109,11 +109,11 @@ final class ReadableNonBlockingInputStream extends EventEmitter implements Reada */ public function close() { - if ($this->closed) { + if ($this->isClosed) { return; } - $this->closed = true; + $this->isClosed = true; $this->emit('close'); $this->pause(); @@ -123,7 +123,7 @@ final class ReadableNonBlockingInputStream extends EventEmitter implements Reada } - private function read(): ?PayloadInterface + private function read(): ?object { try { return $this->input->select();