commit 6a7ea39f5c6e7221916b2a72ce848608d4f5a7b6 Author: Joop Schilder Date: Sat Feb 8 20:44:29 2020 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f7f8ac3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/.idea/ +/vendor/ diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..24ea2a7 --- /dev/null +++ b/composer.json @@ -0,0 +1,21 @@ +{ + "name": "joopschilder/reactphp-input-stream", + "description": "Wraps a non-blocking input to behave like a stream so it can be used in a ReactPHP EventLoop", + "type": "library", + "license": "MIT", + "authors": [ + { + "name": "Joop Schilder", + "email": "jnmschilder@protonmail.com" + } + ], + "require": { + "react/event-loop": "^1.1", + "react/stream": "^1.1" + }, + "autoload": { + "psr-4": { + "JoopSchilder\\React\\Stream\\NonBlockingInput\\": "src" + } + } +} diff --git a/composer.lock b/composer.lock new file mode 100644 index 0000000..142ed77 --- /dev/null +++ b/composer.lock @@ -0,0 +1,149 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", + "This file is @generated automatically" + ], + "content-hash": "8cb0549e866a693d6b61e7e7f9a75862", + "packages": [ + { + "name": "evenement/evenement", + "version": "v3.0.1", + "source": { + "type": "git", + "url": "https://github.com/igorw/evenement.git", + "reference": "531bfb9d15f8aa57454f5f0285b18bec903b8fb7" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/igorw/evenement/zipball/531bfb9d15f8aa57454f5f0285b18bec903b8fb7", + "reference": "531bfb9d15f8aa57454f5f0285b18bec903b8fb7", + "shasum": "" + }, + "require": { + "php": ">=7.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.0" + }, + "type": "library", + "autoload": { + "psr-0": { + "Evenement": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Igor Wiedler", + "email": "igor@wiedler.ch" + } + ], + "description": "Événement is a very simple event dispatching library for PHP", + "keywords": [ + "event-dispatcher", + "event-emitter" + ], + "time": "2017-07-23T21:35:13+00:00" + }, + { + "name": "react/event-loop", + "version": "v1.1.1", + "source": { + "type": "git", + "url": "https://github.com/reactphp/event-loop.git", + "reference": "6d24de090cd59cfc830263cfba965be77b563c13" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/event-loop/zipball/6d24de090cd59cfc830263cfba965be77b563c13", + "reference": "6d24de090cd59cfc830263cfba965be77b563c13", + "shasum": "" + }, + "require": { + "php": ">=5.3.0" + }, + "require-dev": { + "phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35" + }, + "suggest": { + "ext-event": "~1.0 for ExtEventLoop", + "ext-pcntl": "For signal handling support when using the StreamSelectLoop", + "ext-uv": "* for ExtUvLoop" + }, + "type": "library", + "autoload": { + "psr-4": { + "React\\EventLoop\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "ReactPHP's core reactor event loop that libraries can use for evented I/O.", + "keywords": [ + "asynchronous", + "event-loop" + ], + "time": "2020-01-01T18:39:52+00:00" + }, + { + "name": "react/stream", + "version": "v1.1.0", + "source": { + "type": "git", + "url": "https://github.com/reactphp/stream.git", + "reference": "50426855f7a77ddf43b9266c22320df5bf6c6ce6" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/stream/zipball/50426855f7a77ddf43b9266c22320df5bf6c6ce6", + "reference": "50426855f7a77ddf43b9266c22320df5bf6c6ce6", + "shasum": "" + }, + "require": { + "evenement/evenement": "^3.0 || ^2.0 || ^1.0", + "php": ">=5.3.8", + "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5" + }, + "require-dev": { + "clue/stream-filter": "~1.2", + "phpunit/phpunit": "^6.4 || ^5.7 || ^4.8.35" + }, + "type": "library", + "autoload": { + "psr-4": { + "React\\Stream\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Event-driven readable and writable streams for non-blocking I/O in ReactPHP", + "keywords": [ + "event-driven", + "io", + "non-blocking", + "pipe", + "reactphp", + "readable", + "stream", + "writable" + ], + "time": "2019-01-01T16:15:09+00:00" + } + ], + "packages-dev": [], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": [], + "platform-dev": [] +} diff --git a/examples/basic_example.php b/examples/basic_example.php new file mode 100644 index 0000000..44a848a --- /dev/null +++ b/examples/basic_example.php @@ -0,0 +1,22 @@ +on('data', fn() => print('m')); +$stream->on('error', fn() => print('e')); +$stream->on('close', fn() => print('c')); +$loop->addPeriodicTimer(0.2, fn() => print('.')); +$loop->run(); + diff --git a/examples/basic_example_classes.php b/examples/basic_example_classes.php new file mode 100644 index 0000000..61b0daa --- /dev/null +++ b/examples/basic_example_classes.php @@ -0,0 +1,46 @@ +initialEmission = microtime(true); + } + + + public function select(): ?PayloadInterface + { + $now = microtime(true); + if ($now - $this->initialEmission > self::ERROR_AFTER_S) { + throw new Exception('Oh no!'); + } + if ($now - $this->lastEmission > self::DATA_AVAILABLE_INTERVAL_S) { + $this->lastEmission = $now; + + return new DemoEmptyPayload(); + } + + return null; + } + + + public function close(): void + { + } + +} diff --git a/src/NonBlockingInputInterface.php b/src/NonBlockingInputInterface.php new file mode 100644 index 0000000..0b4d8be --- /dev/null +++ b/src/NonBlockingInputInterface.php @@ -0,0 +1,16 @@ +input = $input; + $this->loop = $loop; + $this->interval = $interval ?? new PollingInterval(); + + $this->resume(); + } + + + public function isReadable() + { + return !$this->closed; + } + + + /** + * @see ReadableResourceStream::pause() + * Pause consumption of the AMQP queue but do not mark the stream as closed + */ + public function pause() + { + if (!$this->listening) { + return; + } + + $this->input->close(); + $this->loop->cancelTimer($this->periodicTimer); + $this->listening = false; + } + + + /** + * @see ReadableResourceStream::resume() + * Register the consumer with the broker and add consumer again + */ + public function resume() + { + if ($this->listening || $this->closed) { + return; + } + + $this->input->open(); + $this->periodicTimer = $this->loop->addPeriodicTimer( + $this->interval->getInterval(), + function () { + if ($data = $this->read()) { + $this->emit('data', [$data]); + } + } + ); + + $this->listening = true; + } + + + /** + * @param WritableStreamInterface $dest + * @param array $options + * @return WritableStreamInterface + * @see ReadableResourceStream::pipe() + */ + public function pipe(WritableStreamInterface $dest, array $options = []) + { + return Util::pipe($this, $dest, $options); + } + + + /** + * @see ReadableResourceStream::close() + */ + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + $this->emit('close'); + $this->pause(); + $this->removeAllListeners(); + + $this->input->close(); + } + + + private function read(): ?PayloadInterface + { + try { + return $this->input->select(); + } catch (Throwable $t) { + $this->emit('error', [ + new RuntimeException('Unable to read data from input: ' . $t->getMessage(), 0, $t), + ]); + $this->close(); + } + + return null; + } + +} diff --git a/src/ValueObject/PollingInterval.php b/src/ValueObject/PollingInterval.php new file mode 100644 index 0000000..7c41eef --- /dev/null +++ b/src/ValueObject/PollingInterval.php @@ -0,0 +1,29 @@ +interval = $interval; + } + + + public function getInterval(): float + { + return $this->interval; + } + +}