Extract components and add more tests

This commit is contained in:
Joop Schilder 2020-12-12 02:11:05 +01:00
parent 600f52567f
commit 92bc0ab407
22 changed files with 598 additions and 149 deletions

View File

@ -1,40 +1,43 @@
# 🚽 Toalett # 🚽 Toalett
Welcome to Toalett, a new initiative, based on the idea that all software is 💩. Toalett is the Norwegian word for toilet. It feels fancier than plain "toilet". Welcome to Toalett, a humble initiative based around the idea that all software is 💩.
Toalett is the Norwegian word for toilet. It feels fancier than plain "toilet".
## Why `toalett/multiprocessing`? ## Why `toalett/multiprocessing`?
[Multiprocessing](https://nl.wikipedia.org/wiki/Multiprocessing) is a technique that is often used in PHP applications to execute tasks asynchronously. [Multiprocessing](https://nl.wikipedia.org/wiki/Multiprocessing) is a technique that is often used in PHP (CLI-)applications to execute tasks asynchronously.
Due to the lack of native [multithreading](https://en.wikipedia.org/wiki/Multithreading_(computer_architecture)) in PHP, developers have to rely on Due to the lack of native [multithreading](https://en.wikipedia.org/wiki/Multithreading_(computer_architecture)) in PHP, developers have to rely on
good old multiprocessing to do this. good old multiprocessing to do this.
We often see code that's written in a quick and dirty way to accomplish this task, with calls to We often see code that's written in a quick and dirty way to accomplish this task, with calls to
`pcntl_fork()` hidden somewhere, leading to ugly implementations. `pcntl_fork()` hidden somewhere, leading to ugly implementations.
Now, I from Toalett have nothing against quick and dirty PHP code. I live it. I breathe it. Toalett has nothing against quick and dirty PHP code. Toalett lives it. It _breathes_ it.
But since multiprocessing so common, it might be nice to use this library. But since multiprocessing so common, it might be nice to use this library.
## Okay, cool, but... How? ## Okay, cool, but... How?
`toalett/multiprocessing` comes with the handy-dandy `ContextBuilder` class which is used to, well, _build_ a _Context_. `toalett/multiprocessing` comes with the handy-dandy `ContextBuilder` class which is used to build a `Context`.
The Context is the central component of this library. It schedules tasks to the _Workers_. A `Context` is the central component of this library. It schedules tasks to the `Workers`.
Workers are a representation of child processes that are working on a task. Workers are a representation of child processes that are working on a task.
The Context uses a [ReactPHP EventLoop](https://reactphp.org/event-loop/) internally
and emits events using the simple (but elegant) [Evenement](https://github.com/igorw/Evenement) library.
The Context uses a [ReactPHP EventLoop](https://reactphp.org/event-loop/) internally and emits events using the simple (but quite elegant) [Evenement](https://github.com/igorw/Evenement) library.
## Examples ## Examples
For most developers, the quickest way to learn something is by looking at examples. For most developers, the quickest way to learn something is by looking at examples.
Three examples are provided. Three examples are provided.
There is a simple example, which There is a simple example, which demonstrates event emission with the creation of 50 jobs.
A counter is incremented every time a job stops.
When all jobs are done, the context is stopped.
### [Simple example](bin/simple_example.php) ### [Simple example](bin/simple_example.php)
```php ```php
<?php <?php
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;
require_once 'path/to/autoload.php'; require_once '/path/to/autoload.php';
// We will run 50 jobs // We will run 50 jobs
const NUM_JOBS = 50; const NUM_JOBS = 50;
@ -48,22 +51,24 @@ $counter = new class {
} }
}; };
// Create a default context with unlimited concurrency // Create a context (defaults to unlimited child processes).
$context = ContextBuilder::create()->build(); // The cleanup interval is the interval at dead processes
// will be read. For this example it's kept low.
// The default value is 5 seconds.
$context = ContextBuilder::create()
->withCleanupInterval(Interval::seconds(0.5))
->build();
// Each time a worker stops, a job is finished $context->on('worker_stopped', [$counter, 'increment']);
$context->on('worker_stopped', fn() => $counter->increment()); $context->on('no_workers_remaining', [$context, 'stop']);
$context->on('stopped', fn() => printf("\nJobs completed: %d\n", $counter->value));
// Automatically stop the context when there are no workers left
$context->on('no_workers_remaining', fn() => $context->stop());
$context->on('stopped', fn() => printf("Jobs completed: %d\n", $counter->value));
// You can submit jobs before the context is running. They will be executed // You can submit jobs before the context is running. They will be executed
// in the order in which they are submitted to the context. They are // in the order in which they are submitted to the context.
// scheduled on a future tick of the underlying event loop. // Each job (thus child process) will be sleeping for 3 seconds.
// Each job will involve sleeping for ~3 seconds in this example.
for ($i = 0; $i < NUM_JOBS; $i++) { for ($i = 0; $i < NUM_JOBS; $i++) {
$context->submit(fn() => sleep(3)); $context->submit(fn() => sleep(3));
print('.');
} }
$context->run(); $context->run();
@ -74,7 +79,7 @@ This example is a bit more elaborate than the previous one.
It serves to demonstrate congestion and how it is handled by the context: It serves to demonstrate congestion and how it is handled by the context:
the context simply blocks all execution until a worker stops and a spot becomes available. the context simply blocks all execution until a worker stops and a spot becomes available.
This example also makes more use of the events (described [here](## Events)). This example shows the usage of events.
```php ```php
<?php <?php
@ -82,12 +87,14 @@ use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\ConcurrencyLimit;
use React\EventLoop\Factory as EventLoopFactory; use React\EventLoop\Factory as EventLoopFactory;
require_once 'path/to/autoload.php'; require_once '/path/to/autoload.php';
// Create our own EventLoop and limit and supply them to the builder // Create our own EventLoop and limit and supply them to the builder
$loop = EventLoopFactory::create(); $loop = EventLoopFactory::create();
$limit = new ConcurrencyLimit(4); $context = ContextBuilder::create()
$context = ContextBuilder::create()->withEventLoop($loop)->withLimit($limit)->build(); ->withEventLoop($loop)
->withLimit(ConcurrencyLimit::atMost(4))
->build();
$context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n")); $context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n"));
$context->on('congestion', fn() => print('C')); $context->on('congestion', fn() => print('C'));
@ -115,8 +122,9 @@ This idea is demonstrated here, while execution is limited to a single worker.
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;
require_once 'path/to/autoload.php'; require_once '/path/to/vendor/autoload.php';
class Job class Job
{ {
@ -129,22 +137,25 @@ class Job
public function __invoke() public function __invoke()
{ {
cli_set_process_title('php ' . $this->title); cli_set_process_title("php {$this->title}");
print("start:{$this->title}\n"); print("+ {$this->title}");
sleep(3); sleep(1);
print("stop :{$this->title}\n"); print("\r {$this->title}\n");
} }
} }
$limit = ConcurrencyLimit::singleWorker(); $limit = ConcurrencyLimit::singleWorker();
$context = ContextBuilder::create()->withLimit($limit)->build(); $context = ContextBuilder::create()
->withLimit(ConcurrencyLimit::singleWorker())
->withCleanupInterval(Interval::seconds(0.2))
->build();
for ($i = 0; $i < 3; $i++) { for ($i = 0; $i < 3; $i++) {
$title = md5(mt_rand()); $title = md5(mt_rand());
$context->submit(new Job($title)); $context->submit(new Job($title));
} }
$context->on('no_workers_remaining', fn() => $context->stop()); $context->on('no_workers_remaining', [$context, 'stop']);
$context->run(); $context->run();
``` ```
@ -158,13 +169,12 @@ $context->run();
1. `no_workers_remaining` 1. `no_workers_remaining`
1. `stopped` 1. `stopped`
These events are emitted by the `Context`. These events are emitted by the context.
The `worker_started` and `worker_stopped` events are emitted by the `Workers` under the hood, They can be subscribed to by calling `$context->on('...', fn() => ...);`.
but they are proxied through the `Context` in order to unify access to them.
#### `booted` #### `booted`
This event is emitted when `$context->run()` is called. This event is emitted when `$context->run()` is called.
This is the first event dispatched by the `Context`. This is the very first event dispatched by the context.
#### `worker_started` #### `worker_started`
This event is emitted when a worker has been started (the process has been forked). This event is emitted when a worker has been started (the process has been forked).
@ -182,19 +192,19 @@ The system naively waits for a child to stop before starting another worker.
#### `congestion_relieved` #### `congestion_relieved`
This event is emitted in case the congestion explained above is relieved. This event is emitted in case the congestion explained above is relieved.
This means that a child has stopped, allowing the execution of a new task. This means that a child has stopped, allowing for the execution of a new task.
#### `no_workers_remaining` #### `no_workers_remaining`
This event is emitted when there are no workers left running. This event is emitted when there are no workers left running.
This usually means there is no more work to do. This usually means there is no more work to do.
It's possible to automatically stop the context when this event occurs. It's possible to automatically stop the context when this event occurs.
This is shown in the second and last example. This is shown in the first and last example.
#### `stopped` #### `stopped`
This event is emitted when `$context->stop()` is called and the eventloop has This event is emitted when `$context->stop()` is called and the eventloop has
succesfully been stopped. succesfully been stopped.
## Why no shared memory? ## Why no shared memory?
Shared memory in PHP is hard to manage and quickly becomes a mess. Shared memory in PHP is hard to manage and quickly becomes a mess. Don't ask.
Don't ask. Feel free to add it yourself though. 😉

View File

@ -2,6 +2,7 @@
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;
require_once __DIR__ . '/../vendor/autoload.php'; require_once __DIR__ . '/../vendor/autoload.php';
@ -16,20 +17,23 @@ class Job
public function __invoke() public function __invoke()
{ {
cli_set_process_title('php ' . $this->title); cli_set_process_title("php {$this->title}");
print("start:{$this->title}\n"); print("+ {$this->title}");
sleep(3); sleep(1);
print("stop :{$this->title}\n"); print("\r {$this->title}\n");
} }
} }
$limit = ConcurrencyLimit::singleWorker(); $limit = ConcurrencyLimit::singleWorker();
$context = ContextBuilder::create()->withLimit($limit)->build(); $context = ContextBuilder::create()
->withLimit(ConcurrencyLimit::singleWorker())
->withCleanupInterval(Interval::seconds(0.2))
->build();
for ($i = 0; $i < 3; $i++) { for ($i = 0; $i < 3; $i++) {
$title = md5(mt_rand()); $title = md5(mt_rand());
$context->submit(new Job($title)); $context->submit(new Job($title));
} }
$context->on('no_workers_remaining', fn() => $context->stop()); $context->on('no_workers_remaining', [$context, 'stop']);
$context->run(); $context->run();

View File

@ -6,10 +6,11 @@ use React\EventLoop\Factory as EventLoopFactory;
require_once __DIR__ . '/../vendor/autoload.php'; require_once __DIR__ . '/../vendor/autoload.php';
// Create our own EventLoop and limit and supply them to the builder
$loop = EventLoopFactory::create(); $loop = EventLoopFactory::create();
$limit = new ConcurrencyLimit(4); $context = ContextBuilder::create()
$context = ContextBuilder::create()->withEventLoop($loop)->withLimit($limit)->build(); ->withEventLoop($loop)
->withLimit(ConcurrencyLimit::atMost(4))
->build();
$context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n")); $context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n"));
$context->on('congestion', fn() => print('C')); $context->on('congestion', fn() => print('C'));
@ -17,7 +18,6 @@ $context->on('congestion_relieved', fn() => print('R'));
$context->on('worker_started', fn() => print('+')); $context->on('worker_started', fn() => print('+'));
$context->on('worker_stopped', fn() => print('-')); $context->on('worker_stopped', fn() => print('-'));
// Submit a fake job every second
$loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10))); $loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10)));
print("Press CTRL+C to stop.\n"); print("Press CTRL+C to stop.\n");

View File

@ -1,10 +1,10 @@
<?php <?php
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;
require_once __DIR__ . '/../vendor/autoload.php'; require_once __DIR__ . '/../vendor/autoload.php';
// We will run 50 jobs
const NUM_JOBS = 50; const NUM_JOBS = 50;
$counter = new class { $counter = new class {
@ -16,22 +16,17 @@ $counter = new class {
} }
}; };
// Create a default context with unlimited concurrency $context = ContextBuilder::create()
$context = ContextBuilder::create()->build(); ->withCleanupInterval(Interval::seconds(0.5))
->build();
// Each time a worker stops, a job is finished $context->on('worker_stopped', [$counter, 'increment']);
$context->on('worker_stopped', fn() => $counter->increment()); $context->on('no_workers_remaining', [$context, 'stop']);
$context->on('stopped', fn() => printf("\nJobs completed: %d\n", $counter->value));
// Automatically stop the context when there are no workers left
$context->on('no_workers_remaining', fn() => $context->stop());
$context->on('stopped', fn() => printf("Jobs completed: %d\n", $counter->value));
// You can submit jobs before the context is running. They will be executed
// in the order in which they are submitted to the context. They are
// scheduled on a future tick of the underlying event loop.
// Each job will involve sleeping for ~3 seconds in this example.
for ($i = 0; $i < NUM_JOBS; $i++) { for ($i = 0; $i < NUM_JOBS; $i++) {
$context->submit(fn() => sleep(3)); $context->submit(fn() => sleep(3));
print('.');
} }
$context->run(); $context->run();

View File

@ -5,8 +5,6 @@
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.5/phpunit.xsd" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.5/phpunit.xsd"
backupGlobals="true" backupGlobals="true"
colors="true" colors="true"
stopOnError="true"
stopOnFailure="true"
stopOnIncomplete="true" stopOnIncomplete="true"
stopOnSkipped="true" stopOnSkipped="true"
stopOnRisky="true"> stopOnRisky="true">

View File

@ -9,7 +9,7 @@ class ConcurrencyLimit
private const VALUE_UNLIMITED = -1; private const VALUE_UNLIMITED = -1;
private int $limit; private int $limit;
public function __construct(int $limit) private function __construct(int $limit)
{ {
if ($limit === 0 || $limit < self::VALUE_UNLIMITED) { if ($limit === 0 || $limit < self::VALUE_UNLIMITED) {
throw new InvalidArgumentException('-1 or positive integer', $limit); throw new InvalidArgumentException('-1 or positive integer', $limit);
@ -22,6 +22,11 @@ class ConcurrencyLimit
return new self(1); return new self(1);
} }
public static function atMost(int $limit): self
{
return new self($limit);
}
public static function unlimited(): self public static function unlimited(): self
{ {
return new self(self::VALUE_UNLIMITED); return new self(self::VALUE_UNLIMITED);

View File

@ -5,31 +5,42 @@ namespace Toalett\Multiprocessing;
use Evenement\EventEmitterInterface; use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait; use Evenement\EventEmitterTrait;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use Toalett\Multiprocessing\Task\Interval;
use Toalett\Multiprocessing\Task\RepeatedTask;
use Toalett\Multiprocessing\Task\Tasks;
class Context implements EventEmitterInterface class Context implements EventEmitterInterface
{ {
public const GC_INTERVAL = 120; public const INTERVAL_GC = 120;
public const CLEANUP_INTERVAL = 5; public const INTERVAL_CLEANUP = 5;
use EventEmitterTrait; use EventEmitterTrait;
private LoopInterface $eventLoop; private LoopInterface $eventLoop;
private ConcurrencyLimit $limit; private ConcurrencyLimit $limit;
private Workers $workers; private Workers $workers;
private Tasks $maintenanceTasks;
public function __construct(LoopInterface $eventLoop, ConcurrencyLimit $limit, ?Workers $workers = null) public function __construct(
LoopInterface $eventLoop,
ConcurrencyLimit $limit,
?Workers $workers = null,
?Interval $cleanupInterval = null,
?Interval $garbageCollectionInterval = null
)
{ {
$this->eventLoop = $eventLoop; $this->eventLoop = $eventLoop;
$this->limit = $limit; $this->limit = $limit;
$this->workers = $workers ?? new Workers(); $this->workers = $workers ?? new Workers();
$this->setupWorkerEventForwarding();
$this->setupMaintenanceTasks($cleanupInterval, $garbageCollectionInterval);
}
public function run(): void
{
$this->eventLoop->futureTick(fn() => $this->emit('booted')); $this->eventLoop->futureTick(fn() => $this->emit('booted'));
$this->eventLoop->futureTick(fn() => gc_enable()); $this->eventLoop->futureTick(fn() => gc_enable());
$this->eventLoop->addPeriodicTimer(self::CLEANUP_INTERVAL, fn() => $this->workers->cleanup()); $this->maintenanceTasks->enable($this->eventLoop);
$this->eventLoop->addPeriodicTimer(self::GC_INTERVAL, fn() => gc_collect_cycles()); $this->eventLoop->run();
$this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid]));
$this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid]));
$this->workers->on('worker_stopped', fn() => $this->emitIf(empty($this->workers), 'no_workers_remaining'));
} }
public function submit(callable $task, ...$args): void public function submit(callable $task, ...$args): void
@ -44,24 +55,31 @@ class Context implements EventEmitterInterface
}); });
} }
public function run(): void
{
$this->eventLoop->run();
}
public function stop(): void public function stop(): void
{ {
$this->eventLoop->futureTick(function() { $this->maintenanceTasks->cancel();
$this->eventLoop->stop();
$this->workers->stop(); $this->workers->stop();
$this->emit('stopped'); $this->emit('stopped');
});
} }
public function emitIf(bool $condition, string $event, ...$args): void private function setupWorkerEventForwarding(): void
{ {
if ($condition) { $this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid]));
$this->emit($event, $args); $this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid]));
} $this->workers->on('no_workers_remaining', fn() => $this->emit('no_workers_remaining'));
}
private function setupMaintenanceTasks(?Interval $cleanupInterval, ?Interval $garbageCollectionInterval): void
{
$this->maintenanceTasks = new Tasks(
new RepeatedTask(
$cleanupInterval ?? Interval::seconds(self::INTERVAL_CLEANUP),
fn() => $this->workers->cleanup()
),
new RepeatedTask(
$garbageCollectionInterval ?? Interval::seconds(self::INTERVAL_GC),
fn() => gc_collect_cycles()
)
);
} }
} }

View File

@ -4,11 +4,15 @@ namespace Toalett\Multiprocessing;
use React\EventLoop\Factory; use React\EventLoop\Factory;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use Toalett\Multiprocessing\Task\Interval;
class ContextBuilder class ContextBuilder
{ {
private ?LoopInterface $loop = null; private ?LoopInterface $loop = null;
private ?ConcurrencyLimit $limit = null; private ?ConcurrencyLimit $limit = null;
private ?Workers $workers = null;
private ?Interval $garbageCollectionInterval = null;
private ?Interval $cleanupInterval = null;
public static function create(): self public static function create(): self
{ {
@ -29,11 +33,35 @@ class ContextBuilder
return $instance; return $instance;
} }
public function withWorkers(Workers $workers): self
{
$instance = clone $this;
$instance->workers = $workers;
return $instance;
}
public function withGarbageCollectionInterval(Interval $interval): self
{
$instance = clone $this;
$instance->garbageCollectionInterval = $interval;
return $instance;
}
public function withCleanupInterval(Interval $interval): self
{
$instance = clone $this;
$instance->cleanupInterval = $interval;
return $instance;
}
public function build(): Context public function build(): Context
{ {
return new Context( return new Context(
$this->loop ?? Factory::create(), $this->loop ?? Factory::create(),
$this->limit ?? ConcurrencyLimit::unlimited() $this->limit ?? ConcurrencyLimit::unlimited(),
$this->workers,
$this->cleanupInterval,
$this->garbageCollectionInterval
); );
} }
} }

View File

@ -4,6 +4,8 @@ namespace Toalett\Multiprocessing\ProcessControl;
class Wait class Wait
{ {
public const NO_HANG = WNOHANG;
public const UNTRACED = WUNTRACED;
public int $pid; public int $pid;
public int $status; public int $status;

43
src/Task/Interval.php Normal file
View File

@ -0,0 +1,43 @@
<?php
namespace Toalett\Multiprocessing\Task;
use Toalett\Multiprocessing\Exception\InvalidArgumentException;
class Interval
{
private float $seconds;
private function __construct(float $seconds)
{
if ($seconds <= 0) {
throw new InvalidArgumentException('positive float', $seconds);
}
$this->seconds = $seconds;
}
public static function seconds(float $seconds): self
{
return new self($seconds);
}
public static function minutes(float $minutes): self
{
return new self(60.0 * $minutes);
}
public static function hours(float $hours): self
{
return new self(3600.0 * $hours);
}
public function asFloat(): float
{
return $this->seconds;
}
public function asInt(): int
{
return (int)$this->seconds;
}
}

22
src/Task/RepeatedTask.php Normal file
View File

@ -0,0 +1,22 @@
<?php
namespace Toalett\Multiprocessing\Task;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
class RepeatedTask extends Task
{
public Interval $interval;
public function __construct(Interval $interval, callable $callable, ...$arguments)
{
$this->interval = $interval;
parent::__construct($callable, $arguments);
}
protected function generateTimer(LoopInterface $loop): TimerInterface
{
return $loop->addPeriodicTimer($this->interval->asFloat(), $this->createDeferredCall());
}
}

48
src/Task/Task.php Normal file
View File

@ -0,0 +1,48 @@
<?php
namespace Toalett\Multiprocessing\Task;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
abstract class Task
{
public $callable;
public array $arguments;
protected ?TimerInterface $timer = null;
public function __construct(callable $callable, ...$arguments)
{
$this->callable = $callable;
$this->arguments = $arguments;
}
abstract protected function generateTimer(LoopInterface $loop): TimerInterface;
protected function createDeferredCall(): callable
{
return fn() => call_user_func_array(
$this->callable,
$this->arguments
);
}
public function enable(LoopInterface $loop): void
{
if (!$this->isBound()) {
$this->timer = $this->generateTimer($loop);
}
}
public function isBound(): bool
{
return !is_null($this->timer);
}
public function cancel(LoopInterface $loop): void
{
if ($this->isBound()) {
$loop->cancelTimer($this->timer);
}
}
}

37
src/Task/Tasks.php Normal file
View File

@ -0,0 +1,37 @@
<?php
namespace Toalett\Multiprocessing\Task;
use React\EventLoop\LoopInterface;
class Tasks
{
/** @var Task[] */
private array $tasks;
private ?LoopInterface $loop = null;
public function __construct(Task ...$tasks)
{
$this->tasks = $tasks;
}
public function enable(LoopInterface $loop): void
{
if (is_null($this->loop)) {
$this->loop = $loop;
foreach ($this->tasks as $task) {
$task->enable($this->loop);
}
}
}
public function cancel(): void
{
if (!is_null($this->loop)) {
foreach ($this->tasks as $task) {
$task->cancel($this->loop);
}
$this->loop = null;
}
}
}

View File

@ -11,17 +11,17 @@ class ConcurrencyLimitTest extends TestCase
{ {
use PropertyInspector; use PropertyInspector;
public function testItDoesNotAllowZeroAsLimit(): void public function testItDoesNotAcceptZero(): void
{ {
$this->expectException(InvalidArgumentException::class); $this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Expected -1 or positive integer, got \'0\''); $this->expectExceptionMessage('Expected -1 or positive integer, got \'0\'');
new ConcurrencyLimit(0); ConcurrencyLimit::atMost(0);
} }
public function testItDoesAllowNegativeOneAsLimit(): void public function testItAcceptsNegativeOneAsUnlimited(): void
{ {
$limit = new ConcurrencyLimit(-1); $limit = ConcurrencyLimit::atMost(-1);
self::assertTrue($limit->isUnlimited()); self::assertTrue($limit->isUnlimited());
} }
@ -30,22 +30,22 @@ class ConcurrencyLimitTest extends TestCase
* @param int $negativeNumber * @param int $negativeNumber
* @dataProvider negativeValueProvider * @dataProvider negativeValueProvider
*/ */
public function testItDoesNotAllowAnyOtherNegativeNumberAsLimitExceptNegativeOne(int $negativeNumber): void public function testItDoesNotAllowAnyOtherNegativeValue(int $negativeNumber): void
{ {
$this->expectException(InvalidArgumentException::class); $this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage(sprintf('Expected -1 or positive integer, got \'%s\'', $negativeNumber)); $this->expectExceptionMessage(sprintf('Expected -1 or positive integer, got \'%s\'', $negativeNumber));
new ConcurrencyLimit($negativeNumber); ConcurrencyLimit::atMost($negativeNumber);
} }
public function testItCanBeMadeUnlimited(): void public function testTheLimitMayBeUnlimited(): void
{ {
$limit = ConcurrencyLimit::unlimited(); $limit = ConcurrencyLimit::unlimited();
self::assertTrue($limit->isUnlimited()); self::assertTrue($limit->isUnlimited());
} }
public function testItCanLimitToASingleWorker(): void public function testTheLimitMayBeASingleWorker(): void
{ {
$limit = ConcurrencyLimit::singleWorker(); $limit = ConcurrencyLimit::singleWorker();
@ -64,8 +64,8 @@ class ConcurrencyLimitTest extends TestCase
public function testABoundLimitCanBeReached(): void public function testABoundLimitCanBeReached(): void
{ {
$three = new ConcurrencyLimit(3); $three = ConcurrencyLimit::atMost(3);
$seven = new ConcurrencyLimit(7); $seven = ConcurrencyLimit::atMost(7);
self::assertTrue($three->isReachedBy(3)); self::assertTrue($three->isReachedBy(3));
self::assertFalse($three->isReachedBy(2)); self::assertFalse($three->isReachedBy(2));

View File

@ -7,6 +7,7 @@ use React\EventLoop\LoopInterface;
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Tests\Tools\PropertyInspector; use Toalett\Multiprocessing\Tests\Tools\PropertyInspector;
use Toalett\Multiprocessing\Workers;
class ContextBuilderTest extends TestCase class ContextBuilderTest extends TestCase
{ {
@ -22,14 +23,14 @@ class ContextBuilderTest extends TestCase
self::assertNotSame($builder->withLimit($limit), $builder); self::assertNotSame($builder->withLimit($limit), $builder);
} }
public function testItGivesBackANewContextEachTimeBuildIsInvoked(): void public function testItBuildsANewContextEveryTime(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
self::assertNotSame($builder->build(), $builder->build()); self::assertNotSame($builder->build(), $builder->build());
} }
public function testItCreatesANewContextWithUnlimitedConcurrencyWhenSupplyingNoArguments(): void public function testTheDefaultConcurrencyLimitIsUnlimited(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
@ -44,7 +45,7 @@ class ContextBuilderTest extends TestCase
self::assertTrue($limit->isUnlimited()); self::assertTrue($limit->isUnlimited());
} }
public function testWhenSuppliedWithACustomEventLoopItUsesThatEventLoop(): void public function testWhenGivenAnEventLoopItUsesThatLoop(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
$eventLoop = $this->createMock(LoopInterface::class); $eventLoop = $this->createMock(LoopInterface::class);
@ -55,7 +56,7 @@ class ContextBuilderTest extends TestCase
self::assertSame($eventLoop, $usedEventLoop); self::assertSame($eventLoop, $usedEventLoop);
} }
public function testWhenSuppliedWithACustomConcurrencyLimitItUsesThatLimit(): void public function testWhenGivenAConcurrencyLimitItUsesThatLimit(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
$limit = $this->createMock(ConcurrencyLimit::class); $limit = $this->createMock(ConcurrencyLimit::class);
@ -65,4 +66,15 @@ class ContextBuilderTest extends TestCase
self::assertSame($limit, $usedLimit); self::assertSame($limit, $usedLimit);
} }
public function testWhenGivenWorkersItUsesThatWorkers(): void
{
$builder = ContextBuilder::create();
$workers = $this->createMock(Workers::class);
$context = $builder->withWorkers($workers)->build();
$usedWorkers = $this->getProperty($context, 'workers');
self::assertSame($workers, $usedWorkers);
}
} }

View File

@ -5,6 +5,7 @@ namespace Toalett\Multiprocessing\Tests;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use React\EventLoop\Factory; use React\EventLoop\Factory;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use React\EventLoop\Timer\Timer;
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\Context; use Toalett\Multiprocessing\Context;
use Toalett\Multiprocessing\Workers; use Toalett\Multiprocessing\Workers;
@ -36,21 +37,24 @@ class ContextTest extends TestCase
$context = new Context($loop, $limit); $context = new Context($loop, $limit);
$limit->method('isReachedBy')->willReturn(true); // trigger congestion $limit->method('isReachedBy')->willReturn(true); // trigger congestion
$loop->futureTick(fn() => $context->stop());
$congestionEventHasTakenPlace = false; $congestionEventHasTakenPlace = false;
$congestionRelievedEventHasTakenPlace = false;
$context->on('congestion', function () use (&$congestionEventHasTakenPlace) { $context->on('congestion', function () use (&$congestionEventHasTakenPlace) {
$congestionEventHasTakenPlace = true; $congestionEventHasTakenPlace = true;
}); });
$congestionRelievedEventHasTakenPlace = false;
$context->on('congestion_relieved', function () use (&$congestionRelievedEventHasTakenPlace) { $context->on('congestion_relieved', function () use (&$congestionRelievedEventHasTakenPlace) {
$congestionRelievedEventHasTakenPlace = true; $congestionRelievedEventHasTakenPlace = true;
}); });
self::assertFalse($congestionEventHasTakenPlace); self::assertFalse($congestionEventHasTakenPlace);
self::assertFalse($congestionRelievedEventHasTakenPlace); self::assertFalse($congestionRelievedEventHasTakenPlace);
$context->submit(fn() => []);
$loop->futureTick(fn() => $context->stop());
$context->submit(static fn() => null);
$context->run(); $context->run();
self::assertTrue($congestionEventHasTakenPlace); self::assertTrue($congestionEventHasTakenPlace);
self::assertTrue($congestionRelievedEventHasTakenPlace); self::assertTrue($congestionRelievedEventHasTakenPlace);
} }
@ -62,35 +66,46 @@ class ContextTest extends TestCase
$context = new Context($loop, $limit); $context = new Context($loop, $limit);
$limit->method('isReachedBy')->willReturn(false); $limit->method('isReachedBy')->willReturn(false);
$loop->expects(self::once())->method('futureTick')->withConsecutive( $loop->expects(self::once())
[fn() => []], ->method('futureTick')
); ->withConsecutive([
static fn() => null,
]);
$context->submit(fn() => []); $context->submit(static fn() => null);
} }
public function testItRegistersMaintenanceCallbacksOnTheEventLoop(): void public function testItRegistersMaintenanceTasksOnTheEventLoop(): void
{ {
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$limit = $this->createMock(ConcurrencyLimit::class); $limit = $this->createMock(ConcurrencyLimit::class);
$loop->expects(self::exactly(2))->method('addPeriodicTimer')->withConsecutive( $loop->expects(self::exactly(2))
[Context::CLEANUP_INTERVAL, fn() => []], ->method('addPeriodicTimer')
[Context::GC_INTERVAL, fn() => []] ->withConsecutive(
[Context::INTERVAL_CLEANUP, static fn() => null],
[Context::INTERVAL_GC, static fn() => null]
)->willReturnOnConsecutiveCalls(
new Timer(Context::INTERVAL_CLEANUP, static fn() => null),
new Timer(Context::INTERVAL_GC, static fn() => null),
); );
new Context($loop, $limit); $context = new Context($loop, $limit);
$context->run();
} }
public function testItProxiesWorkersEventsToSelf(): void public function testItForwardsWorkersEventsToSelf(): void
{ {
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$limit = $this->createMock(ConcurrencyLimit::class); $limit = $this->createMock(ConcurrencyLimit::class);
$workers = $this->createMock(Workers::class); $workers = $this->createMock(Workers::class);
$workers->expects(self::atLeast(2))->method('on')->withConsecutive( $workers->expects(self::exactly(3))
['worker_started', fn() => []], ->method('on')
['worker_stopped', fn() => []] ->withConsecutive(
['worker_started', static fn() => null],
['worker_stopped', static fn() => null],
['no_workers_remaining', static fn() => null]
); );
new Context($loop, $limit, $workers); new Context($loop, $limit, $workers);

View File

@ -0,0 +1,58 @@
<?php
namespace Toalett\Multiprocessing\Tests\Task;
use Generator;
use PHPUnit\Framework\TestCase;
use Toalett\Multiprocessing\Exception\InvalidArgumentException;
use Toalett\Multiprocessing\Task\Interval;
class IntervalTest extends TestCase
{
/**
* @param $method
* @param $val
* @param $calculatedVal
* @dataProvider zeroAndDownProvider
*/
public function testItDoesNotAllowLessThanZeroOrZero($method, $val, $calculatedVal): void
{
$this->setName(sprintf('It does not allow %d for %s', $val, $method));
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage(sprintf('Expected positive float, got \'%s\'', $calculatedVal));
Interval::{$method}($val);
}
/**
* @param $method
* @param $val
* @param $expected
* @dataProvider oneAndUpProvider
*/
public function testItCalculatesTheCorrectInterval($method, $val, $expected): void
{
$this->setName('It calculates the correct interval in ' . $method);
$interval = Interval::{$method}($val);
self::assertEquals($expected, $interval->asFloat());
}
public function zeroAndDownProvider(): Generator
{
return $this->createProvider(0, -5, -9000);
}
public function oneAndUpProvider(): Generator
{
return $this->createProvider(1, 5, 7500);
}
public function createProvider(...$args): Generator
{
foreach ($args as $arg) {
yield "$arg seconds" => ['seconds', $arg, $arg];
yield "$arg minutes" => ['minutes', $arg, $arg * 60.0];
yield "$arg hours" => ['hours', $arg, $arg * 3600.0];
}
}
}

View File

@ -0,0 +1,37 @@
<?php
namespace Toalett\Multiprocessing\Tests\Task;
use Generator;
use PHPUnit\Framework\TestCase;
use React\EventLoop\LoopInterface;
use React\EventLoop\Timer\Timer;
use Toalett\Multiprocessing\Task\Interval;
use Toalett\Multiprocessing\Task\RepeatedTask;
class RepeatedTaskTest extends TestCase
{
/**
* @param $interval
* @dataProvider dataProvider
*/
public function testItRegistersWithTheProvidedInterval(Interval $interval): void
{
$loop = $this->createMock(LoopInterface::class);
$loop->expects(self::once())
->method('addPeriodicTimer')
->with($interval->asFloat(), static fn() => null)
->willReturn(new Timer($interval->asFloat(), static fn() => null, true));
$task = new RepeatedTask($interval, static fn() => null);
$task->enable($loop);
}
public function dataProvider(): Generator
{
yield "3 seconds" => [Interval::seconds(3)];
yield "5 minutes" => [Interval::minutes(5)];
yield "half an hour" => [Interval::hours(0.5)];
yield "a day" => [Interval::hours(24)];
}
}

View File

@ -0,0 +1,91 @@
<?php
namespace Toalett\Multiprocessing\Tests\Task;
use PHPUnit\Framework\MockObject\MockObject;
use React\EventLoop\LoopInterface;
use Toalett\Multiprocessing\Task\Task;
use Toalett\Multiprocessing\Task\Tasks;
use PHPUnit\Framework\TestCase;
use Toalett\Multiprocessing\Tests\Tools\PropertyInspector;
class TasksTest extends TestCase
{
use PropertyInspector;
public function testItAcceptsZeroTasks(): void
{
$this->expectNotToPerformAssertions();
new Tasks();
}
public function testItAcceptsMultipleTasks(): void
{
$this->expectNotToPerformAssertions();
new Tasks(
$this->createMock(Task::class),
$this->createMock(Task::class)
);
}
public function testItDoesNotReEnableWhenEnabled(): void
{
$loop = $this->createMock(LoopInterface::class);
$task = $this->createMock(Task::class);
$tasks = new Tasks($task);
$task->expects(self::once())
->method('enable')
->with($loop);
$tasks->enable($loop);
$tasks->enable($loop);
}
public function testItEnablesAllTasksWhenEnableCalled(): void
{
$loop = $this->createMock(LoopInterface::class);
$task1 = $this->createMock(Task::class);
$task2 = $this->createMock(Task::class);
$task3 = $this->createMock(Task::class);
foreach([$task1, $task2, $task3] as $task) {
/** @var MockObject|Task $task */
$task->expects(self::once())->method('enable')->with($loop);
}
(new Tasks($task1, $task2, $task3))->enable($loop);
}
public function testItCancelsAllTasksWhenCancelCalled(): void
{
$loop = $this->createMock(LoopInterface::class);
$task1 = $this->createMock(Task::class);
$task2 = $this->createMock(Task::class);
$task3 = $this->createMock(Task::class);
foreach([$task1, $task2, $task3] as $task) {
/** @var MockObject|Task $task */
$task->expects(self::once())->method('cancel')->with($loop);
}
$tasks = new Tasks($task1, $task2, $task3);
$this->setProperty($tasks, 'loop', $loop);
$tasks->cancel();
}
public function testItDoesNotCancelTasksWhenTheyAreNotEnabled(): void
{
$task1 = $this->createMock(Task::class);
$task2 = $this->createMock(Task::class);
$task3 = $this->createMock(Task::class);
foreach([$task1, $task2, $task3] as $task) {
/** @var MockObject|Task $task */
$task->expects(self::never())->method('cancel');
}
$tasks = new Tasks($task1, $task2, $task3);
$tasks->cancel();
}
}

View File

@ -13,4 +13,12 @@ trait PropertyInspector
$property->setAccessible(true); $property->setAccessible(true);
return $property->getValue($object); return $property->getValue($object);
} }
protected function setProperty(object $object, string $propertyName, $value): void
{
$reflector = new ReflectionObject($object);
$property = $reflector->getProperty($propertyName);
$property->setAccessible(true);
$property->setValue($object, $value);
}
} }

View File

@ -49,8 +49,8 @@ class WorkersTest extends TestCase
$workers->on('worker_started', function () use (&$workerStartedEventHasTakenPlace) { $workers->on('worker_started', function () use (&$workerStartedEventHasTakenPlace) {
$workerStartedEventHasTakenPlace = true; $workerStartedEventHasTakenPlace = true;
}); });
self::assertFalse($workerStartedEventHasTakenPlace);
self::assertFalse($workerStartedEventHasTakenPlace);
$workers->createWorkerFor(fn() => exit(0), []); $workers->createWorkerFor(fn() => exit(0), []);
self::assertTrue($workerStartedEventHasTakenPlace); self::assertTrue($workerStartedEventHasTakenPlace);
} }
@ -72,6 +72,20 @@ class WorkersTest extends TestCase
self::assertTrue($workerStoppedEventHasTakenPlace); self::assertTrue($workerStoppedEventHasTakenPlace);
} }
public function testItEmitsAnEventWhenNoWorkersRemain(): void
{
$workers = new Workers();
$noWorkersRemainingEventHasTakenPlace = false;
$workers->on('no_workers_remaining', function () use (&$noWorkersRemainingEventHasTakenPlace) {
$noWorkersRemainingEventHasTakenPlace = true;
});
self::assertFalse($noWorkersRemainingEventHasTakenPlace);
$workers->cleanup();
self::assertTrue($noWorkersRemainingEventHasTakenPlace);
}
public function testItCallsForkOnProcessControlWhenAskedToCreateAWorker(): void public function testItCallsForkOnProcessControlWhenAskedToCreateAWorker(): void
{ {
$processControl = $this->createMock(ProcessControl::class); $processControl = $this->createMock(ProcessControl::class);
@ -88,7 +102,7 @@ class WorkersTest extends TestCase
$processControl = $this->createMock(ProcessControl::class); $processControl = $this->createMock(ProcessControl::class);
$processControl->expects(self::once()) $processControl->expects(self::once())
->method('wait') ->method('wait')
->with(WNOHANG) ->with(Wait::NO_HANG)
->willReturn(new Wait(0)); ->willReturn(new Wait(0));
$workers = new Workers($processControl); $workers = new Workers($processControl);

View File

@ -9,6 +9,7 @@ use Throwable;
use Toalett\Multiprocessing\Exception\ProcessControlException; use Toalett\Multiprocessing\Exception\ProcessControlException;
use Toalett\Multiprocessing\ProcessControl\PCNTL; use Toalett\Multiprocessing\ProcessControl\PCNTL;
use Toalett\Multiprocessing\ProcessControl\ProcessControl; use Toalett\Multiprocessing\ProcessControl\ProcessControl;
use Toalett\Multiprocessing\ProcessControl\Wait;
class Workers implements Countable, EventEmitterInterface class Workers implements Countable, EventEmitterInterface
{ {
@ -37,7 +38,10 @@ class Workers implements Countable, EventEmitterInterface
public function cleanup(): void public function cleanup(): void
{ {
while (true === $this->wait(WNOHANG)) ; while (true === $this->wait(Wait::NO_HANG)) ;
if (0 === count($this)) {
$this->emit('no_workers_remaining');
}
} }
public function awaitCongestionRelief(): void public function awaitCongestionRelief(): void