Update README.md, consistent use of spaces instead of tabs, better examples

This commit is contained in:
Joop Schilder 2020-12-12 13:05:48 +01:00
parent 92bc0ab407
commit db081158d7
30 changed files with 997 additions and 1041 deletions

211
README.md
View File

@ -4,7 +4,7 @@ Welcome to Toalett, a humble initiative based around the idea that all software
Toalett is the Norwegian word for toilet. It feels fancier than plain "toilet". Toalett is the Norwegian word for toilet. It feels fancier than plain "toilet".
## Why `toalett/multiprocessing`? ## Why `toalett/multiprocessing`?
[Multiprocessing](https://nl.wikipedia.org/wiki/Multiprocessing) is a technique that is often used in PHP (CLI-)applications to execute tasks asynchronously. [Multiprocessing](https://nl.wikipedia.org/wiki/Multiprocessing) is a technique that is often used in PHP (cli) applications to execute tasks asynchronously.
Due to the lack of native [multithreading](https://en.wikipedia.org/wiki/Multithreading_(computer_architecture)) in PHP, developers have to rely on Due to the lack of native [multithreading](https://en.wikipedia.org/wiki/Multithreading_(computer_architecture)) in PHP, developers have to rely on
good old multiprocessing to do this. good old multiprocessing to do this.
@ -22,6 +22,56 @@ Workers are a representation of child processes that are working on a task.
The Context uses a [ReactPHP EventLoop](https://reactphp.org/event-loop/) internally The Context uses a [ReactPHP EventLoop](https://reactphp.org/event-loop/) internally
and emits events using the simple (but elegant) [Evenement](https://github.com/igorw/Evenement) library. and emits events using the simple (but elegant) [Evenement](https://github.com/igorw/Evenement) library.
## Events
The context emits events when something of interest happens.
You can react to these events by calling:
`$context->on('name_of_event', fn() => ...);`.
These are the events emitted by the context:
1. `booted`
2. `worker_started`
3. `worker_stopped`
4. `congestion`
5. `congestion_relieved`
6. `no_workers_remaining`
7. `stopped`
#### 1. `booted`
This event is emitted after `$context->run()` is called.
This is the very first event dispatched by the context.
It is dispatched as soon as the event loop has started.
#### 2. `worker_started`
This event is emitted when a worker has been started (the process has been forked).
The PID of the child process is supplied as an argument to a listener.
#### 3. `worker_stopped`
This event is emitted when a worker has been stopped (child process has stopped).
The PID of the child process is supplied as an argument to a listener.
#### 4. `congestion`
This event is emitted when the imposed concurrency limit is reached.
This happens when (for example) the concurrency is set to at most 2 child processes,
and a third task gets submitted while 2 tasks are already running.
The system naively waits for a child to stop before starting another worker.
#### 5. `congestion_relieved`
This event is emitted when congestion is relieved.
This means that a child has stopped, allowing for the execution of a new task.
#### 6. `no_workers_remaining`
This event is emitted when there are no workers left running.
This usually means there is no more work to do.
It's possible to automatically stop the context when this event occurs.
This is shown in the first and last example.
#### 7. `stopped`
The context can be stopped by calling `$context->stop()`.
When the workers and the event loop are succesfully stopped, the context
emits a `stopped` event.
## Examples ## Examples
For most developers, the quickest way to learn something is by looking at examples. For most developers, the quickest way to learn something is by looking at examples.
Three examples are provided. Three examples are provided.
@ -30,181 +80,98 @@ There is a simple example, which demonstrates event emission with the creation o
A counter is incremented every time a job stops. A counter is incremented every time a job stops.
When all jobs are done, the context is stopped. When all jobs are done, the context is stopped.
### [Simple example](bin/simple_example.php) The cleanup interval is the interval at which the context checks for dead
worker processes and reads their exit codes.
It defaults to 5 seconds and is in some examples explicitely set to a low
value to improve example responsiveness.
### [Counting stopped workers using events](bin/counting_stopped_workers.php)
```php ```php
<?php <?php
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval; use Toalett\Multiprocessing\Task\Interval;
require_once '/path/to/autoload.php';
// We will run 50 jobs
const NUM_JOBS = 50; const NUM_JOBS = 50;
$counter = new class {
public int $value = 0;
public function increment(): void
{
$this->value++;
}
};
// Create a context (defaults to unlimited child processes).
// The cleanup interval is the interval at dead processes
// will be read. For this example it's kept low.
// The default value is 5 seconds.
$context = ContextBuilder::create() $context = ContextBuilder::create()
->withCleanupInterval(Interval::seconds(0.5)) ->withCleanupInterval(Interval::seconds(0.5))
->build(); ->build();
$counter = new Counter();
$context->on('worker_stopped', [$counter, 'increment']); $context->on('worker_stopped', [$counter, 'increment']);
$context->on('no_workers_remaining', [$context, 'stop']); $context->on('no_workers_remaining', [$context, 'stop']);
$context->on('stopped', fn() => printf("\nJobs completed: %d\n", $counter->value)); $context->on('stopped', fn() => printf(" %d\n", $counter->value));
// You can submit jobs before the context is running. They will be executed
// in the order in which they are submitted to the context.
// Each job (thus child process) will be sleeping for 3 seconds.
for ($i = 0; $i < NUM_JOBS; $i++) { for ($i = 0; $i < NUM_JOBS; $i++) {
$context->submit(fn() => sleep(3)); $context->submit(fn() => sleep(2));
print('.'); print('.');
} }
$context->run(); $context->run();
``` ```
### [More elaborate example](bin/more_elaborate_example.php) ### [Triggering congestion with 4 workers](bin/triggering_congestion.php)
This example is a bit more elaborate than the previous one. This example is a bit more elaborate than the previous one.
It serves to demonstrate congestion and how it is handled by the context: It serves to demonstrate congestion and how it is handled by the context:
the context simply blocks all execution until a worker stops and a spot becomes available. the context simply blocks all execution until a worker stops and a spot becomes available.
This example shows the usage of events. Watch for the occurence of 'C' in the output.
This denotes congestion: a worker could not be started.
```php ```php
<?php <?php
use React\EventLoop\Factory;
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\Concurrency;
use React\EventLoop\Factory as EventLoopFactory;
require_once '/path/to/autoload.php'; $loop = Factory::create();
// Create our own EventLoop and limit and supply them to the builder
$loop = EventLoopFactory::create();
$context = ContextBuilder::create() $context = ContextBuilder::create()
->withEventLoop($loop) ->withEventLoop($loop)
->withLimit(ConcurrencyLimit::atMost(4)) ->withConcurrency(Concurrency::atMost(4))
->build(); ->build();
$context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n")); $context->on('booted', fn() => print("🚽 toalett context booted\n"));
$context->on('congestion', fn() => print('C')); $context->on('congestion', fn() => print('C'));
$context->on('congestion_relieved', fn() => print('R')); $context->on('congestion_relieved', fn() => print('R'));
$context->on('worker_started', fn() => print('+')); $context->on('worker_started', fn() => print('+'));
$context->on('worker_stopped', fn() => print('-')); $context->on('worker_stopped', fn() => print('-'));
// Submit a fake job every second // A job is submitted to the context every second.
// The job sleeps for a random amount of seconds (0 - 10).
$loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10))); $loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10)));
print("Press CTRL+C to stop.\n"); print("Press CTRL+C to stop.\n");
$context->run(); $context->run();
``` ```
### [Example with a Job class](bin/example_with_job_class.php) ### [Single worker with a Job class](bin/single_worker_with_job_class.php)
Since the task is defined by a `callable` supplied with arguments, it's also possible to Since a task is really just a `Closure`, it's also possible to submit an object
define a class that implements the magic `__invoke()` method and submit objects of this with an implementation of the `__invoke()` magic method.
class to the Context. Objects implementing the `__invoke()` method can be treated as
closures. They may accept zero or more arguments. In this example, execution is limited to a single worker, and jobs are
instances of the `Job` class.
This idea is demonstrated here, while execution is limited to a single worker.
```php ```php
<?php <?php
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval; use Toalett\Multiprocessing\Task\Interval;
require_once '/path/to/vendor/autoload.php';
class Job
{
private string $title;
public function __construct(string $title)
{
$this->title = $title;
}
public function __invoke()
{
cli_set_process_title("php {$this->title}");
print("+ {$this->title}");
sleep(1);
print("\r {$this->title}\n");
}
}
$limit = ConcurrencyLimit::singleWorker();
$context = ContextBuilder::create() $context = ContextBuilder::create()
->withLimit(ConcurrencyLimit::singleWorker()) ->withConcurrency(Concurrency::singleWorker())
->withCleanupInterval(Interval::seconds(0.2)) ->withCleanupInterval(Interval::seconds(0.2))
->build(); ->build();
for ($i = 0; $i < 3; $i++) { for ($i = 0; $i < 3; $i++) {
$title = md5(mt_rand()); $title = md5(mt_rand());
$context->submit(new Job($title)); $context->submit(new Job($title));
} }
$context->on('no_workers_remaining', [$context, 'stop']); $context->on('no_workers_remaining', [$context, 'stop']);
$context->run(); $context->run();
``` ```
## Events ## Tests
Tests can be found in the [src/Tests/](src/Tests) directory.
1. `booted`
1. `worker_started`
1. `worker_stopped`
1. `congestion`
1. `congestion_relieved`
1. `no_workers_remaining`
1. `stopped`
These events are emitted by the context.
They can be subscribed to by calling `$context->on('...', fn() => ...);`.
#### `booted`
This event is emitted when `$context->run()` is called.
This is the very first event dispatched by the context.
#### `worker_started`
This event is emitted when a worker has been started (the process has been forked).
The PID of the child process is supplied as an argument to a listener.
#### `worker_stopped`
This event is emitted when a worker has been stopped (child process has stopped).
The PID of the child process is supplied as an argument to a listener.
#### `congestion`
This event is emitted when the imposed concurrency limit is reached, for example,
when the limit is set to at most 2 child processes, and a third task gets submitted
while there are already two tasks running.
The system naively waits for a child to stop before starting another worker.
#### `congestion_relieved`
This event is emitted in case the congestion explained above is relieved.
This means that a child has stopped, allowing for the execution of a new task.
#### `no_workers_remaining`
This event is emitted when there are no workers left running.
This usually means there is no more work to do.
It's possible to automatically stop the context when this event occurs.
This is shown in the first and last example.
#### `stopped`
This event is emitted when `$context->stop()` is called and the eventloop has
succesfully been stopped.
## Why no shared memory?
Shared memory in PHP is hard to manage and quickly becomes a mess. Don't ask.
Feel free to add it yourself though. 😉

11
bin/classes/Counter.php Normal file
View File

@ -0,0 +1,11 @@
<?php
class Counter
{
public int $value = 0;
public function increment(): void
{
$this->value++;
}
}

19
bin/classes/Job.php Normal file
View File

@ -0,0 +1,19 @@
<?php
class Job
{
private string $title;
public function __construct(string $title)
{
$this->title = $title;
}
public function __invoke()
{
cli_set_process_title("php {$this->title}");
print("* {$this->title}");
sleep(1);
print("\r {$this->title}\n");
}
}

View File

@ -4,29 +4,21 @@ use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval; use Toalett\Multiprocessing\Task\Interval;
require_once __DIR__ . '/../vendor/autoload.php'; require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/classes/Counter.php';
const NUM_JOBS = 50; const NUM_JOBS = 50;
$counter = new class {
public int $value = 0;
public function increment(): void
{
$this->value++;
}
};
$context = ContextBuilder::create() $context = ContextBuilder::create()
->withCleanupInterval(Interval::seconds(0.5)) ->withCleanupInterval(Interval::seconds(0.5))
->build(); ->build();
$counter = new Counter();
$context->on('worker_stopped', [$counter, 'increment']); $context->on('worker_stopped', [$counter, 'increment']);
$context->on('no_workers_remaining', [$context, 'stop']); $context->on('no_workers_remaining', [$context, 'stop']);
$context->on('stopped', fn() => printf("\nJobs completed: %d\n", $counter->value)); $context->on('stopped', fn() => printf(" %d\n", $counter->value));
for ($i = 0; $i < NUM_JOBS; $i++) { for ($i = 0; $i < NUM_JOBS; $i++) {
$context->submit(fn() => sleep(3)); $context->submit(fn() => sleep(2));
print('.'); print('.');
} }
$context->run(); $context->run();

View File

@ -1,39 +0,0 @@
<?php
use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;
require_once __DIR__ . '/../vendor/autoload.php';
class Job
{
private string $title;
public function __construct(string $title)
{
$this->title = $title;
}
public function __invoke()
{
cli_set_process_title("php {$this->title}");
print("+ {$this->title}");
sleep(1);
print("\r {$this->title}\n");
}
}
$limit = ConcurrencyLimit::singleWorker();
$context = ContextBuilder::create()
->withLimit(ConcurrencyLimit::singleWorker())
->withCleanupInterval(Interval::seconds(0.2))
->build();
for ($i = 0; $i < 3; $i++) {
$title = md5(mt_rand());
$context->submit(new Job($title));
}
$context->on('no_workers_remaining', [$context, 'stop']);
$context->run();

View File

@ -0,0 +1,21 @@
<?php
use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/classes/Job.php';
$context = ContextBuilder::create()
->withConcurrency(Concurrency::singleWorker())
->withCleanupInterval(Interval::seconds(0.2))
->build();
for ($i = 0; $i < 3; $i++) {
$title = md5(mt_rand());
$context->submit(new Job($title));
}
$context->on('no_workers_remaining', [$context, 'stop']);
$context->run();

View File

@ -1,16 +1,16 @@
<?php <?php
use React\EventLoop\Factory;
use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\ConcurrencyLimit;
use React\EventLoop\Factory as EventLoopFactory;
require_once __DIR__ . '/../vendor/autoload.php'; require_once __DIR__ . '/../vendor/autoload.php';
$loop = EventLoopFactory::create(); $loop = Factory::create();
$context = ContextBuilder::create() $context = ContextBuilder::create()
->withEventLoop($loop) ->withEventLoop($loop)
->withLimit(ConcurrencyLimit::atMost(4)) ->withConcurrency(Concurrency::atMost(4))
->build(); ->build();
$context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n")); $context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n"));
$context->on('congestion', fn() => print('C')); $context->on('congestion', fn() => print('C'));

View File

@ -4,7 +4,7 @@ namespace Toalett\Multiprocessing;
use Toalett\Multiprocessing\Exception\InvalidArgumentException; use Toalett\Multiprocessing\Exception\InvalidArgumentException;
class ConcurrencyLimit class Concurrency
{ {
private const VALUE_UNLIMITED = -1; private const VALUE_UNLIMITED = -1;
private int $limit; private int $limit;
@ -24,7 +24,7 @@ class ConcurrencyLimit
public static function atMost(int $limit): self public static function atMost(int $limit): self
{ {
return new self($limit); return new self($limit);
} }
public static function unlimited(): self public static function unlimited(): self

View File

@ -11,75 +11,70 @@ use Toalett\Multiprocessing\Task\Tasks;
class Context implements EventEmitterInterface class Context implements EventEmitterInterface
{ {
public const INTERVAL_GC = 120; public const INTERVAL_GC = 120;
public const INTERVAL_CLEANUP = 5; public const INTERVAL_CLEANUP = 5;
use EventEmitterTrait; use EventEmitterTrait;
private LoopInterface $eventLoop; private LoopInterface $eventLoop;
private ConcurrencyLimit $limit; private Concurrency $concurrency;
private Workers $workers; private Workers $workers;
private Tasks $maintenanceTasks; private Tasks $maintenanceTasks;
public function __construct( public function __construct(
LoopInterface $eventLoop, LoopInterface $eventLoop,
ConcurrencyLimit $limit, Concurrency $concurrency,
?Workers $workers = null, ?Workers $workers = null,
?Interval $cleanupInterval = null, ?Interval $cleanupInterval = null
?Interval $garbageCollectionInterval = null )
) {
{ $this->eventLoop = $eventLoop;
$this->eventLoop = $eventLoop; $this->concurrency = $concurrency;
$this->limit = $limit; $this->workers = $workers ?? new Workers();
$this->workers = $workers ?? new Workers(); $this->setupWorkerEventForwarding();
$this->setupWorkerEventForwarding(); $this->setupMaintenanceTasks($cleanupInterval);
$this->setupMaintenanceTasks($cleanupInterval, $garbageCollectionInterval); }
}
public function run(): void public function run(): void
{ {
$this->eventLoop->futureTick(fn() => $this->emit('booted')); $this->eventLoop->futureTick(fn() => $this->emit('booted'));
$this->eventLoop->futureTick(fn() => gc_enable()); $this->eventLoop->futureTick(fn() => gc_enable());
$this->maintenanceTasks->enable($this->eventLoop); $this->maintenanceTasks->enable($this->eventLoop);
$this->eventLoop->run(); $this->eventLoop->run();
} }
public function submit(callable $task, ...$args): void public function submit(callable $task, ...$args): void
{ {
$this->eventLoop->futureTick(function () use ($task, $args) { $this->eventLoop->futureTick(function () use ($task, $args) {
if ($this->limit->isReachedBy(count($this->workers))) { if ($this->concurrency->isReachedBy(count($this->workers))) {
$this->emit('congestion'); $this->emit('congestion');
$this->workers->awaitCongestionRelief(); $this->workers->awaitCongestionRelief();
$this->emit('congestion_relieved'); $this->emit('congestion_relieved');
} }
$this->workers->createWorkerFor($task, $args); $this->workers->createWorkerFor($task, $args);
}); });
} }
public function stop(): void public function stop(): void
{ {
$this->maintenanceTasks->cancel(); $this->maintenanceTasks->cancel();
$this->workers->stop(); $this->workers->stop();
$this->emit('stopped'); $this->emit('stopped');
} }
private function setupWorkerEventForwarding(): void private function setupWorkerEventForwarding(): void
{ {
$this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid])); $this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid]));
$this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid])); $this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid]));
$this->workers->on('no_workers_remaining', fn() => $this->emit('no_workers_remaining')); $this->workers->on('no_workers_remaining', fn() => $this->emit('no_workers_remaining'));
} }
private function setupMaintenanceTasks(?Interval $cleanupInterval, ?Interval $garbageCollectionInterval): void private function setupMaintenanceTasks(?Interval $cleanupInterval): void
{ {
$this->maintenanceTasks = new Tasks( $cleanupInterval = $cleanupInterval ?? Interval::seconds(self::INTERVAL_CLEANUP);
new RepeatedTask( $gcInterval = Interval::seconds(self::INTERVAL_GC);
$cleanupInterval ?? Interval::seconds(self::INTERVAL_CLEANUP), $this->maintenanceTasks = new Tasks(
fn() => $this->workers->cleanup() new RepeatedTask($cleanupInterval, [$this->workers, 'cleanup']),
), new RepeatedTask($gcInterval, 'gc_collect_cycles')
new RepeatedTask( );
$garbageCollectionInterval ?? Interval::seconds(self::INTERVAL_GC), }
fn() => gc_collect_cycles()
)
);
}
} }

View File

@ -8,60 +8,51 @@ use Toalett\Multiprocessing\Task\Interval;
class ContextBuilder class ContextBuilder
{ {
private ?LoopInterface $loop = null; private ?LoopInterface $loop = null;
private ?ConcurrencyLimit $limit = null; private ?Concurrency $concurrency = null;
private ?Workers $workers = null; private ?Workers $workers = null;
private ?Interval $garbageCollectionInterval = null; private ?Interval $cleanupInterval = null;
private ?Interval $cleanupInterval = null;
public static function create(): self public static function create(): self
{ {
return new self(); return new self();
} }
public function withEventLoop(LoopInterface $loop): self public function withEventLoop(LoopInterface $loop): self
{ {
$instance = clone $this; $instance = clone $this;
$instance->loop = $loop; $instance->loop = $loop;
return $instance; return $instance;
} }
public function withLimit(ConcurrencyLimit $limit): self public function withConcurrency(Concurrency $concurrency): self
{ {
$instance = clone $this; $instance = clone $this;
$instance->limit = $limit; $instance->concurrency = $concurrency;
return $instance; return $instance;
} }
public function withWorkers(Workers $workers): self public function withWorkers(Workers $workers): self
{ {
$instance = clone $this; $instance = clone $this;
$instance->workers = $workers; $instance->workers = $workers;
return $instance; return $instance;
} }
public function withGarbageCollectionInterval(Interval $interval): self public function withCleanupInterval(Interval $interval): self
{ {
$instance = clone $this; $instance = clone $this;
$instance->garbageCollectionInterval = $interval; $instance->cleanupInterval = $interval;
return $instance; return $instance;
} }
public function withCleanupInterval(Interval $interval): self public function build(): Context
{ {
$instance = clone $this; return new Context(
$instance->cleanupInterval = $interval; $this->loop ?? Factory::create(),
return $instance; $this->concurrency ?? Concurrency::unlimited(),
} $this->workers,
$this->cleanupInterval
public function build(): Context );
{ }
return new Context(
$this->loop ?? Factory::create(),
$this->limit ?? ConcurrencyLimit::unlimited(),
$this->workers,
$this->cleanupInterval,
$this->garbageCollectionInterval
);
}
} }

View File

@ -4,25 +4,25 @@ namespace Toalett\Multiprocessing\ProcessControl;
class Fork class Fork
{ {
public int $pid; public int $pid;
public function __construct(int $pid) public function __construct(int $pid)
{ {
$this->pid = $pid; $this->pid = $pid;
} }
public function failed(): bool public function failed(): bool
{ {
return $this->pid < 0; return $this->pid < 0;
} }
public function isChild(): bool public function isChild(): bool
{ {
return $this->pid === 0; return $this->pid === 0;
} }
public function isParent(): bool public function isParent(): bool
{ {
return $this->pid !== 0; return $this->pid !== 0;
} }
} }

View File

@ -4,15 +4,15 @@ namespace Toalett\Multiprocessing\ProcessControl;
class PCNTL implements ProcessControl class PCNTL implements ProcessControl
{ {
public function fork(): Fork public function fork(): Fork
{ {
$pid = pcntl_fork(); $pid = pcntl_fork();
return new Fork($pid); return new Fork($pid);
} }
public function wait(int $options = 0): Wait public function wait(int $options = 0): Wait
{ {
$pid = pcntl_wait($status, $options); $pid = pcntl_wait($status, $options);
return new Wait($pid, $status); return new Wait($pid, $status);
} }
} }

View File

@ -4,7 +4,7 @@ namespace Toalett\Multiprocessing\ProcessControl;
interface ProcessControl interface ProcessControl
{ {
public function fork(): Fork; public function fork(): Fork;
public function wait(int $options = 0): Wait; public function wait(int $options = 0): Wait;
} }

View File

@ -4,24 +4,24 @@ namespace Toalett\Multiprocessing\ProcessControl;
class Wait class Wait
{ {
public const NO_HANG = WNOHANG; public const NO_HANG = WNOHANG;
public const UNTRACED = WUNTRACED; public const UNTRACED = WUNTRACED;
public int $pid; public int $pid;
public int $status; public int $status;
public function __construct(int $pid, int $status = 0) public function __construct(int $pid, int $status = 0)
{ {
$this->pid = $pid; $this->pid = $pid;
$this->status = $status; $this->status = $status;
} }
public function childStopped(): bool public function childStopped(): bool
{ {
return $this->pid > 0; return $this->pid > 0;
} }
public function failed(): bool public function failed(): bool
{ {
return $this->pid < 0; return $this->pid < 0;
} }
} }

View File

@ -6,38 +6,38 @@ use Toalett\Multiprocessing\Exception\InvalidArgumentException;
class Interval class Interval
{ {
private float $seconds; private float $seconds;
private function __construct(float $seconds) private function __construct(float $seconds)
{ {
if ($seconds <= 0) { if ($seconds <= 0) {
throw new InvalidArgumentException('positive float', $seconds); throw new InvalidArgumentException('positive float', $seconds);
} }
$this->seconds = $seconds; $this->seconds = $seconds;
} }
public static function seconds(float $seconds): self public static function seconds(float $seconds): self
{ {
return new self($seconds); return new self($seconds);
} }
public static function minutes(float $minutes): self public static function minutes(float $minutes): self
{ {
return new self(60.0 * $minutes); return new self(60.0 * $minutes);
} }
public static function hours(float $hours): self public static function hours(float $hours): self
{ {
return new self(3600.0 * $hours); return new self(3600.0 * $hours);
} }
public function asFloat(): float public function asFloat(): float
{ {
return $this->seconds; return $this->seconds;
} }
public function asInt(): int public function asInt(): int
{ {
return (int)$this->seconds; return (int)$this->seconds;
} }
} }

View File

@ -7,16 +7,16 @@ use React\EventLoop\TimerInterface;
class RepeatedTask extends Task class RepeatedTask extends Task
{ {
public Interval $interval; public Interval $interval;
public function __construct(Interval $interval, callable $callable, ...$arguments) public function __construct(Interval $interval, callable $callable, ...$arguments)
{ {
$this->interval = $interval; $this->interval = $interval;
parent::__construct($callable, $arguments); parent::__construct($callable, $arguments);
} }
protected function generateTimer(LoopInterface $loop): TimerInterface protected function generateTimer(LoopInterface $loop): TimerInterface
{ {
return $loop->addPeriodicTimer($this->interval->asFloat(), $this->createDeferredCall()); return $loop->addPeriodicTimer($this->interval->asFloat(), $this->createDeferredCall());
} }
} }

View File

@ -7,42 +7,42 @@ use React\EventLoop\TimerInterface;
abstract class Task abstract class Task
{ {
public $callable; public $callable;
public array $arguments; public array $arguments;
protected ?TimerInterface $timer = null; protected ?TimerInterface $timer = null;
public function __construct(callable $callable, ...$arguments) public function __construct(callable $callable, ...$arguments)
{ {
$this->callable = $callable; $this->callable = $callable;
$this->arguments = $arguments; $this->arguments = $arguments;
} }
abstract protected function generateTimer(LoopInterface $loop): TimerInterface; abstract protected function generateTimer(LoopInterface $loop): TimerInterface;
protected function createDeferredCall(): callable protected function createDeferredCall(): callable
{ {
return fn() => call_user_func_array( return fn() => call_user_func_array(
$this->callable, $this->callable,
$this->arguments $this->arguments
); );
} }
public function enable(LoopInterface $loop): void public function enable(LoopInterface $loop): void
{ {
if (!$this->isBound()) { if (!$this->isBound()) {
$this->timer = $this->generateTimer($loop); $this->timer = $this->generateTimer($loop);
} }
} }
public function isBound(): bool public function isBound(): bool
{ {
return !is_null($this->timer); return !is_null($this->timer);
} }
public function cancel(LoopInterface $loop): void public function cancel(LoopInterface $loop): void
{ {
if ($this->isBound()) { if ($this->isBound()) {
$loop->cancelTimer($this->timer); $loop->cancelTimer($this->timer);
} }
} }
} }

View File

@ -6,32 +6,32 @@ use React\EventLoop\LoopInterface;
class Tasks class Tasks
{ {
/** @var Task[] */ /** @var Task[] */
private array $tasks; private array $tasks;
private ?LoopInterface $loop = null; private ?LoopInterface $loop = null;
public function __construct(Task ...$tasks) public function __construct(Task ...$tasks)
{ {
$this->tasks = $tasks; $this->tasks = $tasks;
} }
public function enable(LoopInterface $loop): void public function enable(LoopInterface $loop): void
{ {
if (is_null($this->loop)) { if (is_null($this->loop)) {
$this->loop = $loop; $this->loop = $loop;
foreach ($this->tasks as $task) { foreach ($this->tasks as $task) {
$task->enable($this->loop); $task->enable($this->loop);
} }
} }
} }
public function cancel(): void public function cancel(): void
{ {
if (!is_null($this->loop)) { if (!is_null($this->loop)) {
foreach ($this->tasks as $task) { foreach ($this->tasks as $task) {
$task->cancel($this->loop); $task->cancel($this->loop);
} }
$this->loop = null; $this->loop = null;
} }
} }
} }

View File

@ -1,88 +0,0 @@
<?php
namespace Toalett\Multiprocessing\Tests;
use PHPUnit\Framework\TestCase;
use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\Exception\InvalidArgumentException;
use Toalett\Multiprocessing\Tests\Tools\PropertyInspector;
class ConcurrencyLimitTest extends TestCase
{
use PropertyInspector;
public function testItDoesNotAcceptZero(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Expected -1 or positive integer, got \'0\'');
ConcurrencyLimit::atMost(0);
}
public function testItAcceptsNegativeOneAsUnlimited(): void
{
$limit = ConcurrencyLimit::atMost(-1);
self::assertTrue($limit->isUnlimited());
}
/**
* @param int $negativeNumber
* @dataProvider negativeValueProvider
*/
public function testItDoesNotAllowAnyOtherNegativeValue(int $negativeNumber): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage(sprintf('Expected -1 or positive integer, got \'%s\'', $negativeNumber));
ConcurrencyLimit::atMost($negativeNumber);
}
public function testTheLimitMayBeUnlimited(): void
{
$limit = ConcurrencyLimit::unlimited();
self::assertTrue($limit->isUnlimited());
}
public function testTheLimitMayBeASingleWorker(): void
{
$limit = ConcurrencyLimit::singleWorker();
self::assertFalse($limit->isUnlimited());
self::assertEquals(1, $this->getProperty($limit, 'limit'));
}
public function testAnUnlimitedLimitCanNeverBeReached(): void
{
$limit = ConcurrencyLimit::unlimited();
self::assertFalse($limit->isReachedBy(PHP_INT_MIN));
self::assertFalse($limit->isReachedBy(0));
self::assertFalse($limit->isReachedBy(PHP_INT_MAX));
}
public function testABoundLimitCanBeReached(): void
{
$three = ConcurrencyLimit::atMost(3);
$seven = ConcurrencyLimit::atMost(7);
self::assertTrue($three->isReachedBy(3));
self::assertFalse($three->isReachedBy(2));
self::assertFalse($three->isReachedBy(1));
self::assertTrue($seven->isReachedBy(7));
self::assertTrue($seven->isReachedBy(120));
self::assertFalse($seven->isReachedBy(-2));
}
public function negativeValueProvider(): array
{
return [
'-2' => [-2],
'-3' => [-3],
'-10000' => [-10000],
'PHP_INT_MIN' => [PHP_INT_MIN],
];
}
}

View File

@ -0,0 +1,87 @@
<?php
namespace Toalett\Multiprocessing\Tests;
use PHPUnit\Framework\TestCase;
use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\Exception\InvalidArgumentException;
use Toalett\Multiprocessing\Tests\Tools\PropertyInspector;
class ConcurrencyTest extends TestCase
{
use PropertyInspector;
public function testItAcceptsNegativeOneAsUnlimited(): void
{
$concurrency = Concurrency::atMost(-1);
self::assertTrue($concurrency->isUnlimited());
}
public function testItDoesNotAcceptZero(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Expected -1 or positive integer, got \'0\'');
Concurrency::atMost(0);
}
/**
* @param int $negativeNumber
* @dataProvider negativeValueProvider
*/
public function testItDoesNotAllowAnyOtherNegativeValue(int $negativeNumber): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage(sprintf('Expected -1 or positive integer, got \'%s\'', $negativeNumber));
Concurrency::atMost($negativeNumber);
}
public function testTheLimitMayBeUnlimited(): void
{
$concurrency = Concurrency::unlimited();
self::assertTrue($concurrency->isUnlimited());
}
public function testTheLimitMayBeASingleWorker(): void
{
$concurrency = Concurrency::singleWorker();
self::assertFalse($concurrency->isUnlimited());
self::assertEquals(1, $this->getProperty($concurrency, 'limit'));
}
public function testAnUnlimitedLimitCanNeverBeReached(): void
{
$concurrency = Concurrency::unlimited();
self::assertFalse($concurrency->isReachedBy(PHP_INT_MIN));
self::assertFalse($concurrency->isReachedBy(0));
self::assertFalse($concurrency->isReachedBy(PHP_INT_MAX));
}
public function testABoundLimitCanBeReached(): void
{
$three = Concurrency::atMost(3);
$seven = Concurrency::atMost(7);
self::assertTrue($three->isReachedBy(3));
self::assertFalse($three->isReachedBy(2));
self::assertFalse($three->isReachedBy(1));
self::assertTrue($seven->isReachedBy(7));
self::assertTrue($seven->isReachedBy(120));
self::assertFalse($seven->isReachedBy(-2));
}
public function negativeValueProvider(): array
{
return [
'-2' => [-2],
'-3' => [-3],
'-10000' => [-10000],
'PHP_INT_MIN' => [PHP_INT_MIN],
];
}
}

View File

@ -4,77 +4,77 @@ namespace Toalett\Multiprocessing\Tests;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Tests\Tools\PropertyInspector; use Toalett\Multiprocessing\Tests\Tools\PropertyInspector;
use Toalett\Multiprocessing\Workers; use Toalett\Multiprocessing\Workers;
class ContextBuilderTest extends TestCase class ContextBuilderTest extends TestCase
{ {
use PropertyInspector; use PropertyInspector;
public function testItIsImmutable(): void public function testItIsImmutable(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
$eventLoop = $this->createMock(LoopInterface::class); $eventLoop = $this->createMock(LoopInterface::class);
$limit = $this->createMock(ConcurrencyLimit::class); $concurrency = $this->createMock(Concurrency::class);
self::assertNotSame($builder->withEventLoop($eventLoop), $builder); self::assertNotSame($builder->withEventLoop($eventLoop), $builder);
self::assertNotSame($builder->withLimit($limit), $builder); self::assertNotSame($builder->withConcurrency($concurrency), $builder);
} }
public function testItBuildsANewContextEveryTime(): void public function testItBuildsANewContextEveryTime(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
self::assertNotSame($builder->build(), $builder->build()); self::assertNotSame($builder->build(), $builder->build());
} }
public function testTheDefaultConcurrencyLimitIsUnlimited(): void public function testTheDefaultConcurrencyIsUnlimited(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
$context = $builder->build(); $context = $builder->build();
self::assertIsObject($context); self::assertIsObject($context);
self::assertInstanceOf(LoopInterface::class, $this->getProperty($context, 'eventLoop')); self::assertInstanceOf(LoopInterface::class, $this->getProperty($context, 'eventLoop'));
/** @var ConcurrencyLimit|null $limit */ /** @var Concurrency|null $concurrency */
$limit = $this->getProperty($context, 'limit'); $concurrency = $this->getProperty($context, 'concurrency');
self::assertIsObject($limit); self::assertIsObject($concurrency);
self::assertInstanceOf(ConcurrencyLimit::class, $limit); self::assertInstanceOf(Concurrency::class, $concurrency);
self::assertTrue($limit->isUnlimited()); self::assertTrue($concurrency->isUnlimited());
} }
public function testWhenGivenAnEventLoopItUsesThatLoop(): void public function testWhenGivenAnEventLoopItUsesThatLoop(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
$eventLoop = $this->createMock(LoopInterface::class); $eventLoop = $this->createMock(LoopInterface::class);
$context = $builder->withEventLoop($eventLoop)->build(); $context = $builder->withEventLoop($eventLoop)->build();
$usedEventLoop = $this->getProperty($context, 'eventLoop'); $usedEventLoop = $this->getProperty($context, 'eventLoop');
self::assertSame($eventLoop, $usedEventLoop); self::assertSame($eventLoop, $usedEventLoop);
} }
public function testWhenGivenAConcurrencyLimitItUsesThatLimit(): void public function testWhenGivenAConcurrencyItUsesThatConcurrency(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
$limit = $this->createMock(ConcurrencyLimit::class); $concurrency = $this->createMock(Concurrency::class);
$context = $builder->withLimit($limit)->build(); $context = $builder->withConcurrency($concurrency)->build();
$usedLimit = $this->getProperty($context, 'limit'); $usedConcurrency = $this->getProperty($context, 'concurrency');
self::assertSame($limit, $usedLimit); self::assertSame($concurrency, $usedConcurrency);
} }
public function testWhenGivenWorkersItUsesThatWorkers(): void public function testWhenGivenWorkersItUsesThatWorkers(): void
{ {
$builder = ContextBuilder::create(); $builder = ContextBuilder::create();
$workers = $this->createMock(Workers::class); $workers = $this->createMock(Workers::class);
$context = $builder->withWorkers($workers)->build(); $context = $builder->withWorkers($workers)->build();
$usedWorkers = $this->getProperty($context, 'workers'); $usedWorkers = $this->getProperty($context, 'workers');
self::assertSame($workers, $usedWorkers); self::assertSame($workers, $usedWorkers);
} }
} }

View File

@ -6,108 +6,108 @@ use PHPUnit\Framework\TestCase;
use React\EventLoop\Factory; use React\EventLoop\Factory;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use React\EventLoop\Timer\Timer; use React\EventLoop\Timer\Timer;
use Toalett\Multiprocessing\ConcurrencyLimit; use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\Context; use Toalett\Multiprocessing\Context;
use Toalett\Multiprocessing\Workers; use Toalett\Multiprocessing\Workers;
class ContextTest extends TestCase class ContextTest extends TestCase
{ {
public function testItEmitsAnEventWhenBooted(): void public function testItEmitsAnEventWhenBooted(): void
{ {
$limit = $this->createMock(ConcurrencyLimit::class); $concurrency = $this->createMock(Concurrency::class);
$loop = Factory::create(); $loop = Factory::create();
$context = new Context($loop, $limit); $context = new Context($loop, $concurrency);
$loop->futureTick(fn() => $context->stop()); $loop->futureTick(fn() => $context->stop());
$bootEventHasTakenPlace = false; $bootEventHasTakenPlace = false;
$context->on('booted', function () use (&$bootEventHasTakenPlace) { $context->on('booted', function () use (&$bootEventHasTakenPlace) {
$bootEventHasTakenPlace = true; $bootEventHasTakenPlace = true;
}); });
self::assertFalse($bootEventHasTakenPlace); self::assertFalse($bootEventHasTakenPlace);
$context->run(); $context->run();
self::assertTrue($bootEventHasTakenPlace); self::assertTrue($bootEventHasTakenPlace);
} }
public function testItEmitsEventsWhenCongestionOccursAndIsRelieved(): void public function testItEmitsEventsWhenCongestionOccursAndIsRelieved(): void
{ {
$loop = Factory::create(); $loop = Factory::create();
$limit = $this->createMock(ConcurrencyLimit::class); $concurrency = $this->createMock(Concurrency::class);
$context = new Context($loop, $limit); $context = new Context($loop, $concurrency);
$limit->method('isReachedBy')->willReturn(true); // trigger congestion $concurrency->method('isReachedBy')->willReturn(true); // trigger congestion
$congestionEventHasTakenPlace = false; $congestionEventHasTakenPlace = false;
$context->on('congestion', function () use (&$congestionEventHasTakenPlace) { $context->on('congestion', function () use (&$congestionEventHasTakenPlace) {
$congestionEventHasTakenPlace = true; $congestionEventHasTakenPlace = true;
}); });
$congestionRelievedEventHasTakenPlace = false; $congestionRelievedEventHasTakenPlace = false;
$context->on('congestion_relieved', function () use (&$congestionRelievedEventHasTakenPlace) { $context->on('congestion_relieved', function () use (&$congestionRelievedEventHasTakenPlace) {
$congestionRelievedEventHasTakenPlace = true; $congestionRelievedEventHasTakenPlace = true;
}); });
self::assertFalse($congestionEventHasTakenPlace); self::assertFalse($congestionEventHasTakenPlace);
self::assertFalse($congestionRelievedEventHasTakenPlace); self::assertFalse($congestionRelievedEventHasTakenPlace);
$loop->futureTick(fn() => $context->stop()); $loop->futureTick(fn() => $context->stop());
$context->submit(static fn() => null); $context->submit(static fn() => null);
$context->run(); $context->run();
self::assertTrue($congestionEventHasTakenPlace); self::assertTrue($congestionEventHasTakenPlace);
self::assertTrue($congestionRelievedEventHasTakenPlace); self::assertTrue($congestionRelievedEventHasTakenPlace);
} }
public function testItCreatesAWorkerForASubmittedTask(): void public function testItCreatesAWorkerForASubmittedTask(): void
{ {
$limit = $this->createMock(ConcurrencyLimit::class); $concurrency = $this->createMock(Concurrency::class);
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$context = new Context($loop, $limit); $context = new Context($loop, $concurrency);
$limit->method('isReachedBy')->willReturn(false); $concurrency->method('isReachedBy')->willReturn(false);
$loop->expects(self::once()) $loop->expects(self::once())
->method('futureTick') ->method('futureTick')
->withConsecutive([ ->withConsecutive([
static fn() => null, static fn() => null,
]); ]);
$context->submit(static fn() => null); $context->submit(static fn() => null);
} }
public function testItRegistersMaintenanceTasksOnTheEventLoop(): void public function testItRegistersMaintenanceTasksOnTheEventLoop(): void
{ {
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$limit = $this->createMock(ConcurrencyLimit::class); $concurrency = $this->createMock(Concurrency::class);
$loop->expects(self::exactly(2)) $loop->expects(self::exactly(2))
->method('addPeriodicTimer') ->method('addPeriodicTimer')
->withConsecutive( ->withConsecutive(
[Context::INTERVAL_CLEANUP, static fn() => null], [Context::INTERVAL_CLEANUP, static fn() => null],
[Context::INTERVAL_GC, static fn() => null] [Context::INTERVAL_GC, static fn() => null]
)->willReturnOnConsecutiveCalls( )->willReturnOnConsecutiveCalls(
new Timer(Context::INTERVAL_CLEANUP, static fn() => null), new Timer(Context::INTERVAL_CLEANUP, static fn() => null),
new Timer(Context::INTERVAL_GC, static fn() => null), new Timer(Context::INTERVAL_GC, static fn() => null),
); );
$context = new Context($loop, $limit); $context = new Context($loop, $concurrency);
$context->run(); $context->run();
} }
public function testItForwardsWorkersEventsToSelf(): void public function testItForwardsWorkersEventsToSelf(): void
{ {
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$limit = $this->createMock(ConcurrencyLimit::class); $concurrency = $this->createMock(Concurrency::class);
$workers = $this->createMock(Workers::class); $workers = $this->createMock(Workers::class);
$workers->expects(self::exactly(3)) $workers->expects(self::exactly(3))
->method('on') ->method('on')
->withConsecutive( ->withConsecutive(
['worker_started', static fn() => null], ['worker_started', static fn() => null],
['worker_stopped', static fn() => null], ['worker_stopped', static fn() => null],
['no_workers_remaining', static fn() => null] ['no_workers_remaining', static fn() => null]
); );
new Context($loop, $limit, $workers); new Context($loop, $concurrency, $workers);
} }
} }

View File

@ -7,55 +7,55 @@ use Toalett\Multiprocessing\ProcessControl\Fork;
class ForkTest extends TestCase class ForkTest extends TestCase
{ {
/** /**
* @param int $pid * @param int $pid
* @dataProvider positiveIntegerProvider * @dataProvider positiveIntegerProvider
*/ */
public function testItSaysItIsAParentProcessWhenAPositivePidIsProvided(int $pid): void public function testItSaysItIsAParentProcessWhenAPositivePidIsProvided(int $pid): void
{ {
$fork = new Fork($pid); $fork = new Fork($pid);
self::assertTrue($fork->isParent()); self::assertTrue($fork->isParent());
self::assertFalse($fork->isChild()); self::assertFalse($fork->isChild());
self::assertFalse($fork->failed()); self::assertFalse($fork->failed());
} }
/** /**
* @param int $pid * @param int $pid
* @dataProvider negativeIntegerProvider * @dataProvider negativeIntegerProvider
*/ */
public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void
{ {
$fork = new Fork($pid); $fork = new Fork($pid);
self::assertTrue($fork->isParent()); self::assertTrue($fork->isParent());
self::assertFalse($fork->isChild()); self::assertFalse($fork->isChild());
self::assertTrue($fork->failed()); self::assertTrue($fork->failed());
} }
public function testItSaysItIsAChildProcessWhenPidZeroIsProvided(): void public function testItSaysItIsAChildProcessWhenPidZeroIsProvided(): void
{ {
$fork = new Fork(0); $fork = new Fork(0);
self::assertFalse($fork->isParent()); self::assertFalse($fork->isParent());
self::assertTrue($fork->isChild()); self::assertTrue($fork->isChild());
self::assertFalse($fork->failed()); self::assertFalse($fork->failed());
} }
public function positiveIntegerProvider(): array public function positiveIntegerProvider(): array
{ {
return [ return [
[1], [1],
[10], [10],
[1000], [1000],
[PHP_INT_MAX], [PHP_INT_MAX],
]; ];
} }
public function negativeIntegerProvider(): array public function negativeIntegerProvider(): array
{ {
return [ return [
[-1], [-1],
[-10], [-10],
[-1000], [-1000],
[PHP_INT_MIN], [PHP_INT_MIN],
]; ];
} }
} }

View File

@ -7,45 +7,45 @@ use Toalett\Multiprocessing\ProcessControl\Wait;
class WaitTest extends TestCase class WaitTest extends TestCase
{ {
/** /**
* @param int $pid * @param int $pid
* @dataProvider positiveIntegerProvider * @dataProvider positiveIntegerProvider
*/ */
public function testItSaysAChildStoppedWhenAPositivePidIsProvided(int $pid): void public function testItSaysAChildStoppedWhenAPositivePidIsProvided(int $pid): void
{ {
$wait = new Wait($pid, 0); $wait = new Wait($pid, 0);
self::assertTrue($wait->childStopped()); self::assertTrue($wait->childStopped());
self::assertFalse($wait->failed()); self::assertFalse($wait->failed());
} }
/** /**
* @param int $pid * @param int $pid
* @dataProvider negativeIntegerProvider * @dataProvider negativeIntegerProvider
*/ */
public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void
{ {
$wait = new Wait($pid, 0); $wait = new Wait($pid, 0);
self::assertFalse($wait->childStopped()); self::assertFalse($wait->childStopped());
self::assertTrue($wait->failed()); self::assertTrue($wait->failed());
} }
public function positiveIntegerProvider(): array public function positiveIntegerProvider(): array
{ {
return [ return [
[1], [1],
[10], [10],
[1000], [1000],
[PHP_INT_MAX], [PHP_INT_MAX],
]; ];
} }
public function negativeIntegerProvider(): array public function negativeIntegerProvider(): array
{ {
return [ return [
[-1], [-1],
[-10], [-10],
[-1000], [-1000],
[PHP_INT_MIN], [PHP_INT_MIN],
]; ];
} }
} }

View File

@ -9,50 +9,50 @@ use Toalett\Multiprocessing\Task\Interval;
class IntervalTest extends TestCase class IntervalTest extends TestCase
{ {
/** /**
* @param $method * @param $method
* @param $val * @param $val
* @param $calculatedVal * @param $calculatedVal
* @dataProvider zeroAndDownProvider * @dataProvider zeroAndDownProvider
*/ */
public function testItDoesNotAllowLessThanZeroOrZero($method, $val, $calculatedVal): void public function testItDoesNotAllowLessThanZeroOrZero($method, $val, $calculatedVal): void
{ {
$this->setName(sprintf('It does not allow %d for %s', $val, $method)); $this->setName(sprintf('It does not allow %d for %s', $val, $method));
$this->expectException(InvalidArgumentException::class); $this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage(sprintf('Expected positive float, got \'%s\'', $calculatedVal)); $this->expectExceptionMessage(sprintf('Expected positive float, got \'%s\'', $calculatedVal));
Interval::{$method}($val); Interval::{$method}($val);
} }
/** /**
* @param $method * @param $method
* @param $val * @param $val
* @param $expected * @param $expected
* @dataProvider oneAndUpProvider * @dataProvider oneAndUpProvider
*/ */
public function testItCalculatesTheCorrectInterval($method, $val, $expected): void public function testItCalculatesTheCorrectInterval($method, $val, $expected): void
{ {
$this->setName('It calculates the correct interval in ' . $method); $this->setName('It calculates the correct interval in ' . $method);
$interval = Interval::{$method}($val); $interval = Interval::{$method}($val);
self::assertEquals($expected, $interval->asFloat()); self::assertEquals($expected, $interval->asFloat());
} }
public function zeroAndDownProvider(): Generator public function zeroAndDownProvider(): Generator
{ {
return $this->createProvider(0, -5, -9000); return $this->createProvider(0, -5, -9000);
} }
public function oneAndUpProvider(): Generator public function oneAndUpProvider(): Generator
{ {
return $this->createProvider(1, 5, 7500); return $this->createProvider(1, 5, 7500);
} }
public function createProvider(...$args): Generator public function createProvider(...$args): Generator
{ {
foreach ($args as $arg) { foreach ($args as $arg) {
yield "$arg seconds" => ['seconds', $arg, $arg]; yield "$arg seconds" => ['seconds', $arg, $arg];
yield "$arg minutes" => ['minutes', $arg, $arg * 60.0]; yield "$arg minutes" => ['minutes', $arg, $arg * 60.0];
yield "$arg hours" => ['hours', $arg, $arg * 3600.0]; yield "$arg hours" => ['hours', $arg, $arg * 3600.0];
} }
} }
} }

View File

@ -11,27 +11,27 @@ use Toalett\Multiprocessing\Task\RepeatedTask;
class RepeatedTaskTest extends TestCase class RepeatedTaskTest extends TestCase
{ {
/** /**
* @param $interval * @param $interval
* @dataProvider dataProvider * @dataProvider dataProvider
*/ */
public function testItRegistersWithTheProvidedInterval(Interval $interval): void public function testItRegistersWithTheProvidedInterval(Interval $interval): void
{ {
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$loop->expects(self::once()) $loop->expects(self::once())
->method('addPeriodicTimer') ->method('addPeriodicTimer')
->with($interval->asFloat(), static fn() => null) ->with($interval->asFloat(), static fn() => null)
->willReturn(new Timer($interval->asFloat(), static fn() => null, true)); ->willReturn(new Timer($interval->asFloat(), static fn() => null, true));
$task = new RepeatedTask($interval, static fn() => null); $task = new RepeatedTask($interval, static fn() => null);
$task->enable($loop); $task->enable($loop);
} }
public function dataProvider(): Generator public function dataProvider(): Generator
{ {
yield "3 seconds" => [Interval::seconds(3)]; yield "3 seconds" => [Interval::seconds(3)];
yield "5 minutes" => [Interval::minutes(5)]; yield "5 minutes" => [Interval::minutes(5)];
yield "half an hour" => [Interval::hours(0.5)]; yield "half an hour" => [Interval::hours(0.5)];
yield "a day" => [Interval::hours(24)]; yield "a day" => [Interval::hours(24)];
} }
} }

View File

@ -3,89 +3,89 @@
namespace Toalett\Multiprocessing\Tests\Task; namespace Toalett\Multiprocessing\Tests\Task;
use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use Toalett\Multiprocessing\Task\Task; use Toalett\Multiprocessing\Task\Task;
use Toalett\Multiprocessing\Task\Tasks; use Toalett\Multiprocessing\Task\Tasks;
use PHPUnit\Framework\TestCase;
use Toalett\Multiprocessing\Tests\Tools\PropertyInspector; use Toalett\Multiprocessing\Tests\Tools\PropertyInspector;
class TasksTest extends TestCase class TasksTest extends TestCase
{ {
use PropertyInspector; use PropertyInspector;
public function testItAcceptsZeroTasks(): void public function testItAcceptsZeroTasks(): void
{ {
$this->expectNotToPerformAssertions(); $this->expectNotToPerformAssertions();
new Tasks(); new Tasks();
} }
public function testItAcceptsMultipleTasks(): void public function testItAcceptsMultipleTasks(): void
{ {
$this->expectNotToPerformAssertions(); $this->expectNotToPerformAssertions();
new Tasks( new Tasks(
$this->createMock(Task::class), $this->createMock(Task::class),
$this->createMock(Task::class) $this->createMock(Task::class)
); );
} }
public function testItDoesNotReEnableWhenEnabled(): void public function testItDoesNotReEnableWhenEnabled(): void
{ {
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$task = $this->createMock(Task::class); $task = $this->createMock(Task::class);
$tasks = new Tasks($task); $tasks = new Tasks($task);
$task->expects(self::once()) $task->expects(self::once())
->method('enable') ->method('enable')
->with($loop); ->with($loop);
$tasks->enable($loop); $tasks->enable($loop);
$tasks->enable($loop); $tasks->enable($loop);
} }
public function testItEnablesAllTasksWhenEnableCalled(): void public function testItEnablesAllTasksWhenEnableCalled(): void
{ {
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$task1 = $this->createMock(Task::class); $task1 = $this->createMock(Task::class);
$task2 = $this->createMock(Task::class); $task2 = $this->createMock(Task::class);
$task3 = $this->createMock(Task::class); $task3 = $this->createMock(Task::class);
foreach([$task1, $task2, $task3] as $task) { foreach ([$task1, $task2, $task3] as $task) {
/** @var MockObject|Task $task */ /** @var MockObject|Task $task */
$task->expects(self::once())->method('enable')->with($loop); $task->expects(self::once())->method('enable')->with($loop);
} }
(new Tasks($task1, $task2, $task3))->enable($loop); (new Tasks($task1, $task2, $task3))->enable($loop);
} }
public function testItCancelsAllTasksWhenCancelCalled(): void public function testItCancelsAllTasksWhenCancelCalled(): void
{ {
$loop = $this->createMock(LoopInterface::class); $loop = $this->createMock(LoopInterface::class);
$task1 = $this->createMock(Task::class); $task1 = $this->createMock(Task::class);
$task2 = $this->createMock(Task::class); $task2 = $this->createMock(Task::class);
$task3 = $this->createMock(Task::class); $task3 = $this->createMock(Task::class);
foreach([$task1, $task2, $task3] as $task) { foreach ([$task1, $task2, $task3] as $task) {
/** @var MockObject|Task $task */ /** @var MockObject|Task $task */
$task->expects(self::once())->method('cancel')->with($loop); $task->expects(self::once())->method('cancel')->with($loop);
} }
$tasks = new Tasks($task1, $task2, $task3); $tasks = new Tasks($task1, $task2, $task3);
$this->setProperty($tasks, 'loop', $loop); $this->setProperty($tasks, 'loop', $loop);
$tasks->cancel(); $tasks->cancel();
} }
public function testItDoesNotCancelTasksWhenTheyAreNotEnabled(): void public function testItDoesNotCancelTasksWhenTheyAreNotEnabled(): void
{ {
$task1 = $this->createMock(Task::class); $task1 = $this->createMock(Task::class);
$task2 = $this->createMock(Task::class); $task2 = $this->createMock(Task::class);
$task3 = $this->createMock(Task::class); $task3 = $this->createMock(Task::class);
foreach([$task1, $task2, $task3] as $task) { foreach ([$task1, $task2, $task3] as $task) {
/** @var MockObject|Task $task */ /** @var MockObject|Task $task */
$task->expects(self::never())->method('cancel'); $task->expects(self::never())->method('cancel');
} }
$tasks = new Tasks($task1, $task2, $task3); $tasks = new Tasks($task1, $task2, $task3);
$tasks->cancel(); $tasks->cancel();
} }
} }

View File

@ -6,19 +6,19 @@ use ReflectionObject;
trait PropertyInspector trait PropertyInspector
{ {
protected function getProperty(object $object, string $propertyName) protected function getProperty(object $object, string $propertyName)
{ {
$reflector = new ReflectionObject($object); $reflector = new ReflectionObject($object);
$property = $reflector->getProperty($propertyName); $property = $reflector->getProperty($propertyName);
$property->setAccessible(true); $property->setAccessible(true);
return $property->getValue($object); return $property->getValue($object);
} }
protected function setProperty(object $object, string $propertyName, $value): void protected function setProperty(object $object, string $propertyName, $value): void
{ {
$reflector = new ReflectionObject($object); $reflector = new ReflectionObject($object);
$property = $reflector->getProperty($propertyName); $property = $reflector->getProperty($propertyName);
$property->setAccessible(true); $property->setAccessible(true);
$property->setValue($object, $value); $property->setValue($object, $value);
} }
} }

View File

@ -11,113 +11,113 @@ use Toalett\Multiprocessing\Workers;
class WorkersTest extends TestCase class WorkersTest extends TestCase
{ {
public function testItSaysItIsEmptyWhenNoWorkers(): void public function testItSaysItIsEmptyWhenNoWorkers(): void
{ {
$processControl = $this->createMock(ProcessControl::class); $processControl = $this->createMock(ProcessControl::class);
$workers = new Workers($processControl); $workers = new Workers($processControl);
self::assertEmpty($workers); self::assertEmpty($workers);
} }
public function testItSaysItHasOneWorkerWhenTaskExecutes(): void public function testItSaysItHasOneWorkerWhenTaskExecutes(): void
{ {
$workers = new Workers(); $workers = new Workers();
$workers->createWorkerFor(fn() => exit(0), []); $workers->createWorkerFor(fn() => exit(0), []);
self::assertCount(1, $workers); self::assertCount(1, $workers);
} }
public function testItGivesTheAmountOfActiveWorkersOnCount(): void public function testItGivesTheAmountOfActiveWorkersOnCount(): void
{ {
$workers = new Workers(); $workers = new Workers();
$workers->createWorkerFor(fn() => exit(0), []); $workers->createWorkerFor(fn() => exit(0), []);
$workers->createWorkerFor(fn() => exit(0), []); $workers->createWorkerFor(fn() => exit(0), []);
self::assertCount(2, $workers); self::assertCount(2, $workers);
$workers->createWorkerFor(fn() => exit(0), []); $workers->createWorkerFor(fn() => exit(0), []);
self::assertCount(3, $workers); self::assertCount(3, $workers);
$workers->stop(); $workers->stop();
self::assertEmpty($workers); self::assertEmpty($workers);
} }
public function testItEmitsAnEventWhenAWorkerIsStarted(): void public function testItEmitsAnEventWhenAWorkerIsStarted(): void
{ {
$workers = new Workers(); $workers = new Workers();
$workerStartedEventHasTakenPlace = false; $workerStartedEventHasTakenPlace = false;
$workers->on('worker_started', function () use (&$workerStartedEventHasTakenPlace) { $workers->on('worker_started', function () use (&$workerStartedEventHasTakenPlace) {
$workerStartedEventHasTakenPlace = true; $workerStartedEventHasTakenPlace = true;
}); });
self::assertFalse($workerStartedEventHasTakenPlace); self::assertFalse($workerStartedEventHasTakenPlace);
$workers->createWorkerFor(fn() => exit(0), []); $workers->createWorkerFor(fn() => exit(0), []);
self::assertTrue($workerStartedEventHasTakenPlace); self::assertTrue($workerStartedEventHasTakenPlace);
} }
public function testItEmitsAnEventWhenAWorkerIsRemoved(): void public function testItEmitsAnEventWhenAWorkerIsRemoved(): void
{ {
$workers = new Workers(); $workers = new Workers();
$reflector = new ReflectionObject($workers); $reflector = new ReflectionObject($workers);
$method = $reflector->getMethod('remove'); $method = $reflector->getMethod('remove');
$method->setAccessible(true); $method->setAccessible(true);
$workerStoppedEventHasTakenPlace = false; $workerStoppedEventHasTakenPlace = false;
$workers->on('worker_stopped', function () use (&$workerStoppedEventHasTakenPlace) { $workers->on('worker_stopped', function () use (&$workerStoppedEventHasTakenPlace) {
$workerStoppedEventHasTakenPlace = true; $workerStoppedEventHasTakenPlace = true;
}); });
self::assertFalse($workerStoppedEventHasTakenPlace); self::assertFalse($workerStoppedEventHasTakenPlace);
$method->invoke($workers, 0); $method->invoke($workers, 0);
self::assertTrue($workerStoppedEventHasTakenPlace); self::assertTrue($workerStoppedEventHasTakenPlace);
} }
public function testItEmitsAnEventWhenNoWorkersRemain(): void public function testItEmitsAnEventWhenNoWorkersRemain(): void
{ {
$workers = new Workers(); $workers = new Workers();
$noWorkersRemainingEventHasTakenPlace = false; $noWorkersRemainingEventHasTakenPlace = false;
$workers->on('no_workers_remaining', function () use (&$noWorkersRemainingEventHasTakenPlace) { $workers->on('no_workers_remaining', function () use (&$noWorkersRemainingEventHasTakenPlace) {
$noWorkersRemainingEventHasTakenPlace = true; $noWorkersRemainingEventHasTakenPlace = true;
}); });
self::assertFalse($noWorkersRemainingEventHasTakenPlace); self::assertFalse($noWorkersRemainingEventHasTakenPlace);
$workers->cleanup(); $workers->cleanup();
self::assertTrue($noWorkersRemainingEventHasTakenPlace); self::assertTrue($noWorkersRemainingEventHasTakenPlace);
} }
public function testItCallsForkOnProcessControlWhenAskedToCreateAWorker(): void public function testItCallsForkOnProcessControlWhenAskedToCreateAWorker(): void
{ {
$processControl = $this->createMock(ProcessControl::class); $processControl = $this->createMock(ProcessControl::class);
$processControl->expects(self::once()) $processControl->expects(self::once())
->method('fork') ->method('fork')
->willReturn(new Fork(1)); ->willReturn(new Fork(1));
$workers = new Workers($processControl); $workers = new Workers($processControl);
$workers->createWorkerFor(fn() => []); $workers->createWorkerFor(fn() => []);
} }
public function testItCallsNonBlockingWaitOnProcessControlWhenPerformingCleanup(): void public function testItCallsNonBlockingWaitOnProcessControlWhenPerformingCleanup(): void
{ {
$processControl = $this->createMock(ProcessControl::class); $processControl = $this->createMock(ProcessControl::class);
$processControl->expects(self::once()) $processControl->expects(self::once())
->method('wait') ->method('wait')
->with(Wait::NO_HANG) ->with(Wait::NO_HANG)
->willReturn(new Wait(0)); ->willReturn(new Wait(0));
$workers = new Workers($processControl); $workers = new Workers($processControl);
$workers->cleanup(); $workers->cleanup();
} }
public function testItCallsBlockingWaitOnProcessControlWhenAwaitingCongestionRelief(): void public function testItCallsBlockingWaitOnProcessControlWhenAwaitingCongestionRelief(): void
{ {
$processControl = $this->createMock(ProcessControl::class); $processControl = $this->createMock(ProcessControl::class);
$processControl->expects(self::once()) $processControl->expects(self::once())
->method('wait') ->method('wait')
->with(/* no arguments */) ->with(/* no arguments */)
->willReturn(new Wait(1)); ->willReturn(new Wait(1));
$workers = new Workers($processControl); $workers = new Workers($processControl);
$workers->awaitCongestionRelief(); $workers->awaitCongestionRelief();
} }
} }

View File

@ -13,86 +13,86 @@ use Toalett\Multiprocessing\ProcessControl\Wait;
class Workers implements Countable, EventEmitterInterface class Workers implements Countable, EventEmitterInterface
{ {
use EventEmitterTrait; use EventEmitterTrait;
/** @var int[] */ /** @var int[] */
private array $workers = []; private array $workers = [];
private ProcessControl $processControl; private ProcessControl $processControl;
public function __construct(?ProcessControl $processControl = null) public function __construct(?ProcessControl $processControl = null)
{ {
$this->processControl = $processControl ?? new PCNTL(); $this->processControl = $processControl ?? new PCNTL();
} }
public function count(): int public function count(): int
{ {
return count($this->workers); return count($this->workers);
} }
public function createWorkerFor(callable $task, array $args = []): void public function createWorkerFor(callable $task, array $args = []): void
{ {
$pid = $this->forkWorker($task, $args); $pid = $this->forkWorker($task, $args);
$this->workers[$pid] = $pid; $this->workers[$pid] = $pid;
$this->emit('worker_started', [$pid]); $this->emit('worker_started', [$pid]);
} }
public function cleanup(): void public function cleanup(): void
{ {
while (true === $this->wait(Wait::NO_HANG)) ; while (true === $this->wait(Wait::NO_HANG)) ;
if (0 === count($this)) { if (0 === count($this)) {
$this->emit('no_workers_remaining'); $this->emit('no_workers_remaining');
} }
} }
public function awaitCongestionRelief(): void public function awaitCongestionRelief(): void
{ {
$this->wait(); $this->wait();
} }
private function remove(int $pid): void private function remove(int $pid): void
{ {
unset($this->workers[$pid]); unset($this->workers[$pid]);
$this->emit('worker_stopped', [$pid]); $this->emit('worker_stopped', [$pid]);
} }
private function forkWorker(callable $task, array $args): int private function forkWorker(callable $task, array $args): int
{ {
$fork = $this->processControl->fork(); $fork = $this->processControl->fork();
if ($fork->failed()) { if ($fork->failed()) {
throw ProcessControlException::forkFailed(); throw ProcessControlException::forkFailed();
} }
if ($fork->isChild()) { if ($fork->isChild()) {
try { try {
call_user_func_array($task, $args); call_user_func_array($task, $args);
} catch (Throwable $t) { } catch (Throwable $t) {
fwrite(STDERR, $t->getMessage()); fwrite(STDERR, $t->getMessage());
exit(1); exit(1);
} }
exit(0); exit(0);
} }
return $fork->pid; return $fork->pid;
} }
/** /**
* @param int $options * @param int $options
* @return bool Whether a process was caught * @return bool Whether a process was caught
*/ */
private function wait(int $options = 0): bool private function wait(int $options = 0): bool
{ {
$wait = $this->processControl->wait($options); $wait = $this->processControl->wait($options);
if ($wait->childStopped()) { if ($wait->childStopped()) {
$this->remove($wait->pid); $this->remove($wait->pid);
return true; return true;
} }
// We ignore errors ($pid < 0). This method is called periodically, even if there is // We ignore errors ($pid < 0). This method is called periodically, even if there is
// no child available. pcntl_wait() will return -1. This is expected behavior. // no child available. pcntl_wait() will return -1. This is expected behavior.
return false; return false;
} }
public function stop(): void public function stop(): void
{ {
while (true === $this->wait()) ; while (true === $this->wait()) ;
} }
} }