Initial commit

This commit is contained in:
Joop Schilder 2020-02-08 20:44:29 +01:00
commit 6a7ea39f5c
9 changed files with 435 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/.idea/
/vendor/

21
composer.json Normal file
View File

@ -0,0 +1,21 @@
{
"name": "joopschilder/reactphp-input-stream",
"description": "Wraps a non-blocking input to behave like a stream so it can be used in a ReactPHP EventLoop",
"type": "library",
"license": "MIT",
"authors": [
{
"name": "Joop Schilder",
"email": "jnmschilder@protonmail.com"
}
],
"require": {
"react/event-loop": "^1.1",
"react/stream": "^1.1"
},
"autoload": {
"psr-4": {
"JoopSchilder\\React\\Stream\\NonBlockingInput\\": "src"
}
}
}

149
composer.lock generated Normal file
View File

@ -0,0 +1,149 @@
{
"_readme": [
"This file locks the dependencies of your project to a known state",
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "8cb0549e866a693d6b61e7e7f9a75862",
"packages": [
{
"name": "evenement/evenement",
"version": "v3.0.1",
"source": {
"type": "git",
"url": "https://github.com/igorw/evenement.git",
"reference": "531bfb9d15f8aa57454f5f0285b18bec903b8fb7"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/igorw/evenement/zipball/531bfb9d15f8aa57454f5f0285b18bec903b8fb7",
"reference": "531bfb9d15f8aa57454f5f0285b18bec903b8fb7",
"shasum": ""
},
"require": {
"php": ">=7.0"
},
"require-dev": {
"phpunit/phpunit": "^6.0"
},
"type": "library",
"autoload": {
"psr-0": {
"Evenement": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Igor Wiedler",
"email": "igor@wiedler.ch"
}
],
"description": "Événement is a very simple event dispatching library for PHP",
"keywords": [
"event-dispatcher",
"event-emitter"
],
"time": "2017-07-23T21:35:13+00:00"
},
{
"name": "react/event-loop",
"version": "v1.1.1",
"source": {
"type": "git",
"url": "https://github.com/reactphp/event-loop.git",
"reference": "6d24de090cd59cfc830263cfba965be77b563c13"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/reactphp/event-loop/zipball/6d24de090cd59cfc830263cfba965be77b563c13",
"reference": "6d24de090cd59cfc830263cfba965be77b563c13",
"shasum": ""
},
"require": {
"php": ">=5.3.0"
},
"require-dev": {
"phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35"
},
"suggest": {
"ext-event": "~1.0 for ExtEventLoop",
"ext-pcntl": "For signal handling support when using the StreamSelectLoop",
"ext-uv": "* for ExtUvLoop"
},
"type": "library",
"autoload": {
"psr-4": {
"React\\EventLoop\\": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"description": "ReactPHP's core reactor event loop that libraries can use for evented I/O.",
"keywords": [
"asynchronous",
"event-loop"
],
"time": "2020-01-01T18:39:52+00:00"
},
{
"name": "react/stream",
"version": "v1.1.0",
"source": {
"type": "git",
"url": "https://github.com/reactphp/stream.git",
"reference": "50426855f7a77ddf43b9266c22320df5bf6c6ce6"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/reactphp/stream/zipball/50426855f7a77ddf43b9266c22320df5bf6c6ce6",
"reference": "50426855f7a77ddf43b9266c22320df5bf6c6ce6",
"shasum": ""
},
"require": {
"evenement/evenement": "^3.0 || ^2.0 || ^1.0",
"php": ">=5.3.8",
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5"
},
"require-dev": {
"clue/stream-filter": "~1.2",
"phpunit/phpunit": "^6.4 || ^5.7 || ^4.8.35"
},
"type": "library",
"autoload": {
"psr-4": {
"React\\Stream\\": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"description": "Event-driven readable and writable streams for non-blocking I/O in ReactPHP",
"keywords": [
"event-driven",
"io",
"non-blocking",
"pipe",
"reactphp",
"readable",
"stream",
"writable"
],
"time": "2019-01-01T16:15:09+00:00"
}
],
"packages-dev": [],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": [],
"platform-dev": []
}

View File

@ -0,0 +1,22 @@
<?php
use JoopSchilder\React\Stream\NonBlockingInput\ReadableNonBlockingInputStream;
use React\EventLoop\Factory;
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/basic_example_classes.php';
/**
* The input will have data available every second.
* After 4 seconds have passed, it will generate an error.
*/
$input = new DemoNonBlockingInput();
$loop = Factory::create();
$stream = new ReadableNonBlockingInputStream($input, $loop);
$stream->on('data', fn() => print('m'));
$stream->on('error', fn() => print('e'));
$stream->on('close', fn() => print('c'));
$loop->addPeriodicTimer(0.2, fn() => print('.'));
$loop->run();

View File

@ -0,0 +1,46 @@
<?php
use JoopSchilder\React\Stream\NonBlockingInput\NonBlockingInputInterface;
use JoopSchilder\React\Stream\NonBlockingInput\PayloadInterface;
final class DemoEmptyPayload implements PayloadInterface
{
}
final class DemoNonBlockingInput implements NonBlockingInputInterface
{
private const DATA_AVAILABLE_INTERVAL_S = 1.0;
private const ERROR_AFTER_S = 4.0;
private float $lastEmission = 0;
private float $initialEmission = 0;
public function open(): void
{
$this->initialEmission = microtime(true);
}
public function select(): ?PayloadInterface
{
$now = microtime(true);
if ($now - $this->initialEmission > self::ERROR_AFTER_S) {
throw new Exception('Oh no!');
}
if ($now - $this->lastEmission > self::DATA_AVAILABLE_INTERVAL_S) {
$this->lastEmission = $now;
return new DemoEmptyPayload();
}
return null;
}
public function close(): void
{
}
}

View File

@ -0,0 +1,16 @@
<?php
namespace JoopSchilder\React\Stream\NonBlockingInput;
interface NonBlockingInputInterface
{
function open(): void;
function select(): ?PayloadInterface;
function close(): void;
}

10
src/PayloadInterface.php Normal file
View File

@ -0,0 +1,10 @@
<?php
namespace JoopSchilder\React\Stream\NonBlockingInput;
/**
* Marker interface.
*/
interface PayloadInterface
{
}

View File

@ -0,0 +1,140 @@
<?php
namespace JoopSchilder\React\Stream\NonBlockingInput;
use Evenement\EventEmitter;
use JoopSchilder\React\Stream\NonBlockingInput\ValueObject\PollingInterval;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Stream\ReadableResourceStream;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;
use RuntimeException;
use Throwable;
/**
* @see ReadableStreamInterface
* @see ReadableResourceStream
*/
final class ReadableNonBlockingInputStream extends EventEmitter implements ReadableStreamInterface
{
private NonBlockingInputInterface $input;
private LoopInterface $loop;
private PollingInterval $interval;
private ?TimerInterface $periodicTimer = null;
private bool $closed = false;
private bool $listening = false;
/**
* @see ReadableStreamInterface
* @see ReadableResourceStream for an example
*/
public function __construct(NonBlockingInputInterface $input, LoopInterface $loop, ?PollingInterval $interval = null)
{
$this->input = $input;
$this->loop = $loop;
$this->interval = $interval ?? new PollingInterval();
$this->resume();
}
public function isReadable()
{
return !$this->closed;
}
/**
* @see ReadableResourceStream::pause()
* Pause consumption of the AMQP queue but do not mark the stream as closed
*/
public function pause()
{
if (!$this->listening) {
return;
}
$this->input->close();
$this->loop->cancelTimer($this->periodicTimer);
$this->listening = false;
}
/**
* @see ReadableResourceStream::resume()
* Register the consumer with the broker and add consumer again
*/
public function resume()
{
if ($this->listening || $this->closed) {
return;
}
$this->input->open();
$this->periodicTimer = $this->loop->addPeriodicTimer(
$this->interval->getInterval(),
function () {
if ($data = $this->read()) {
$this->emit('data', [$data]);
}
}
);
$this->listening = true;
}
/**
* @param WritableStreamInterface $dest
* @param array $options
* @return WritableStreamInterface
* @see ReadableResourceStream::pipe()
*/
public function pipe(WritableStreamInterface $dest, array $options = [])
{
return Util::pipe($this, $dest, $options);
}
/**
* @see ReadableResourceStream::close()
*/
public function close()
{
if ($this->closed) {
return;
}
$this->closed = true;
$this->emit('close');
$this->pause();
$this->removeAllListeners();
$this->input->close();
}
private function read(): ?PayloadInterface
{
try {
return $this->input->select();
} catch (Throwable $t) {
$this->emit('error', [
new RuntimeException('Unable to read data from input: ' . $t->getMessage(), 0, $t),
]);
$this->close();
}
return null;
}
}

View File

@ -0,0 +1,29 @@
<?php
namespace JoopSchilder\React\Stream\NonBlockingInput\ValueObject;
use InvalidArgumentException;
final class PollingInterval
{
private const DEFAULT_INTERVAL = 0.05;
private float $interval;
public function __construct(float $interval = self::DEFAULT_INTERVAL)
{
if ($interval < 0.0) {
throw new InvalidArgumentException('Interval must be greater than 0');
}
$this->interval = $interval;
}
public function getInterval(): float
{
return $this->interval;
}
}