Implement Unit Tests, add Readme, add examples, stronger implementation

This commit is contained in:
Joop Schilder 2020-12-11 01:25:38 +01:00
commit ffebc74a7d
19 changed files with 3149 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
/vendor/
/.vscode/
/.idea/
/tags
/.*.cache

200
README.md Normal file
View File

@ -0,0 +1,200 @@
# 🚽 Toalett
Welcome to Toalett, a new initiative, based on the idea that all software is 💩. Toalett is the Norwegian word for toilet. It feels fancier than plain "toilet".
## Why `toalett/multiprocessing`?
[Multiprocessing](https://nl.wikipedia.org/wiki/Multiprocessing) is a technique that is often used in PHP applications to execute tasks asynchronously.
Due to the lack of native [multithreading](https://en.wikipedia.org/wiki/Multithreading_(computer_architecture)) in PHP, developers have to rely on
good old multiprocessing to do this.
We often see code that's written in a quick and dirty way to accomplish this task, with calls to
`pcntl_fork()` hidden somewhere, leading to ugly implementations.
Now, I from Toalett have nothing against quick and dirty PHP code. I live it. I breathe it.
But since multiprocessing so common, it might be nice to use this library.
## Okay, cool, but... How?
`toalett/multiprocessing` comes with the handy-dandy `ContextBuilder` class which is used to, well, _build_ a _Context_.
The Context is the central component of this library. It schedules tasks to the _Workers_.
Workers are a representation of child processes that are working on a task.
The Context uses a [ReactPHP EventLoop](https://reactphp.org/event-loop/) internally and emits events using the simple (but quite elegant) [Evenement](https://github.com/igorw/Evenement) library.
## Examples
For most developers, the quickest way to learn something is by looking at examples.
Three examples are provided.
There is a simple example, which
### [Simple example](bin/simple_example.php)
```php
<?php
use Toalett\Multiprocessing\ContextBuilder;
require_once 'path/to/autoload.php';
// We will run 50 jobs
const NUM_JOBS = 50;
$counter = new class {
public int $value = 0;
public function increment(): void
{
$this->value++;
}
};
// Create a default context with unlimited concurrency
$context = ContextBuilder::create()->build();
// Each time a worker stops, a job is finished
$context->on('worker_stopped', fn() => $counter->increment());
// Automatically stop the context when there are no workers left
$context->on('no_workers_remaining', fn() => $context->stop());
$context->on('stopped', fn() => printf("Jobs completed: %d\n", $counter->value));
// You can submit jobs before the context is running. They will be executed
// in the order in which they are submitted to the context. They are
// scheduled on a future tick of the underlying event loop.
// Each job will involve sleeping for ~3 seconds in this example.
for ($i = 0; $i < NUM_JOBS; $i++) {
$context->submit(fn() => sleep(3));
}
$context->run();
```
### [More elaborate example](bin/more_elaborate_example.php)
This example is a bit more elaborate than the previous one.
It serves to demonstrate congestion and how it is handled by the context:
the context simply blocks all execution until a worker stops and a spot becomes available.
This example also makes more use of the events (described [here](## Events)).
```php
<?php
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\ConcurrencyLimit;
use React\EventLoop\Factory as EventLoopFactory;
require_once 'path/to/autoload.php';
// Create our own EventLoop and limit and supply them to the builder
$loop = EventLoopFactory::create();
$limit = new ConcurrencyLimit(4);
$context = ContextBuilder::create()->withEventLoop($loop)->withLimit($limit)->build();
$context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n"));
$context->on('congestion', fn() => print('C'));
$context->on('congestion_relieved', fn() => print('R'));
$context->on('worker_started', fn() => print('+'));
$context->on('worker_stopped', fn() => print('-'));
// Submit a fake job every second
$loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10)));
print("Press CTRL+C to stop.\n");
$context->run();
```
### [Example with a Job class](bin/example_with_job_class.php)
Since the task is defined by a `callable` supplied with arguments, it's also possible to
define a class that implements the magic `__invoke()` method and submit objects of this
class to the Context. Objects implementing the `__invoke()` method can be treated as
closures. They may accept zero or more arguments.
This idea is demonstrated here, while execution is limited to a single worker.
```php
<?php
use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\ContextBuilder;
require_once 'path/to/autoload.php';
class Job
{
private string $title;
public function __construct(string $title)
{
$this->title = $title;
}
public function __invoke()
{
cli_set_process_title('php ' . $this->title);
print("start:{$this->title}\n");
sleep(3);
print("stop :{$this->title}\n");
}
}
$limit = ConcurrencyLimit::singleWorker();
$context = ContextBuilder::create()->withLimit($limit)->build();
for ($i = 0; $i < 3; $i++) {
$title = md5(mt_rand());
$context->submit(new Job($title));
}
$context->on('no_workers_remaining', fn() => $context->stop());
$context->run();
```
## Events
1. `booted`
1. `worker_started`
1. `worker_stopped`
1. `congestion`
1. `congestion_relieved`
1. `no_workers_remaining`
1. `stopped`
These events are emitted by the `Context`.
The `worker_started` and `worker_stopped` events are emitted by the `Workers` under the hood,
but they are proxied through the `Context` in order to unify access to them.
#### `booted`
This event is emitted when `$context->run()` is called.
This is the first event dispatched by the `Context`.
#### `worker_started`
This event is emitted when a worker has been started (the process has been forked).
The PID of the child process is supplied as an argument to a listener.
#### `worker_stopped`
This event is emitted when a worker has been stopped (child process has stopped).
The PID of the child process is supplied as an argument to a listener.
#### `congestion`
This event is emitted when the imposed concurrency limit is reached, for example,
when the limit is set to at most 2 child processes, and a third task gets submitted
while there are already two tasks running.
The system naively waits for a child to stop before starting another worker.
#### `congestion_relieved`
This event is emitted in case the congestion explained above is relieved.
This means that a child has stopped, allowing the execution of a new task.
#### `no_workers_remaining`
This event is emitted when there are no workers left running.
This usually means there is no more work to do.
It's possible to automatically stop the context when this event occurs.
This is shown in the second and last example.
#### `stopped`
This event is emitted when `$context->stop()` is called and the eventloop has
succesfully been stopped.
## Why no shared memory?
Shared memory in PHP is hard to manage and quickly becomes a mess.
Don't ask.

View File

@ -0,0 +1,35 @@
<?php
use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\ContextBuilder;
require_once __DIR__ . '/../vendor/autoload.php';
class Job
{
private string $title;
public function __construct(string $title)
{
$this->title = $title;
}
public function __invoke()
{
cli_set_process_title('php ' . $this->title);
print("start:{$this->title}\n");
sleep(3);
print("stop :{$this->title}\n");
}
}
$limit = ConcurrencyLimit::singleWorker();
$context = ContextBuilder::create()->withLimit($limit)->build();
for ($i = 0; $i < 3; $i++) {
$title = md5(mt_rand());
$context->submit(new Job($title));
}
$context->on('no_workers_remaining', fn() => $context->stop());
$context->run();

View File

@ -0,0 +1,24 @@
<?php
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\ConcurrencyLimit;
use React\EventLoop\Factory as EventLoopFactory;
require_once __DIR__ . '/../vendor/autoload.php';
// Create our own EventLoop and limit and supply them to the builder
$loop = EventLoopFactory::create();
$limit = new ConcurrencyLimit(4);
$context = ContextBuilder::create()->withEventLoop($loop)->withLimit($limit)->build();
$context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n"));
$context->on('congestion', fn() => print('C'));
$context->on('congestion_relieved', fn() => print('R'));
$context->on('worker_started', fn() => print('+'));
$context->on('worker_stopped', fn() => print('-'));
// Submit a fake job every second
$loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10)));
print("Press CTRL+C to stop.\n");
$context->run();

37
bin/simple_example.php Normal file
View File

@ -0,0 +1,37 @@
<?php
use Toalett\Multiprocessing\ContextBuilder;
require_once __DIR__ . '/../vendor/autoload.php';
// We will run 50 jobs
const NUM_JOBS = 50;
$counter = new class {
public int $value = 0;
public function increment(): void
{
$this->value++;
}
};
// Create a default context with unlimited concurrency
$context = ContextBuilder::create()->build();
// Each time a worker stops, a job is finished
$context->on('worker_stopped', fn() => $counter->increment());
// Automatically stop the context when there are no workers left
$context->on('no_workers_remaining', fn() => $context->stop());
$context->on('stopped', fn() => printf("Jobs completed: %d\n", $counter->value));
// You can submit jobs before the context is running. They will be executed
// in the order in which they are submitted to the context. They are
// scheduled on a future tick of the underlying event loop.
// Each job will involve sleeping for ~3 seconds in this example.
for ($i = 0; $i < NUM_JOBS; $i++) {
$context->submit(fn() => sleep(3));
}
$context->run();

38
composer.json Normal file
View File

@ -0,0 +1,38 @@
{
"name": "toalett/multiprocessing",
"type": "library",
"description": "Context for easy multiprocessing",
"keywords": ["multiprocessing", "threading", "task", "job", "multicore"],
"homepage": "https://github.com/toalett/php-multiprocessing",
"license": "MIT",
"authors": [
{
"name": "Joop Schilder",
"homepage": "https://joopschilder.nl"
}
],
"autoload": {
"psr-4": {
"Toalett\\Multiprocessing\\": ["src"]
},
"exclude-from-classmap": [
"**/Tests/"
]
},
"autoload-dev": {
"psr-4": {
"Toalett\\Multiprocessing\\Tests\\": ["src/Tests"]
}
},
"require": {
"php": ">=7.4",
"ext-pcntl": "*",
"react/event-loop": "^1.1",
"evenement/evenement": "^3.0"
},
"extra": {
},
"require-dev": {
"phpunit/phpunit": "^9.5.0"
}
}

2202
composer.lock generated Normal file

File diff suppressed because it is too large Load Diff

24
phpunit.xml.dist Normal file
View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- https://phpunit.de/manual/current/en/appendixes.configuration.html -->
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.5/phpunit.xsd"
backupGlobals="true"
colors="true"
stopOnError="true"
stopOnFailure="true"
stopOnIncomplete="true"
stopOnSkipped="true"
stopOnRisky="true">
<testsuites>
<testsuite name="Toalett Multiprocessing Test Suite">
<directory>src/Tests</directory>
</testsuite>
</testsuites>
<php>
<ini name="error_reporting" value="-1"/>
</php>
</phpunit>

42
src/ConcurrencyLimit.php Normal file
View File

@ -0,0 +1,42 @@
<?php
namespace Toalett\Multiprocessing;
use Toalett\Multiprocessing\Exception\InvalidArgumentException;
class ConcurrencyLimit
{
private const VALUE_UNLIMITED = -1;
private int $limit;
public function __construct(int $limit)
{
if ($limit === 0 || $limit < self::VALUE_UNLIMITED) {
throw new InvalidArgumentException('-1 or positive integer', $limit);
}
$this->limit = $limit;
}
public static function singleWorker(): self
{
return new self(1);
}
public static function unlimited(): self
{
return new self(self::VALUE_UNLIMITED);
}
public function isUnlimited(): bool
{
return $this->limit === self::VALUE_UNLIMITED;
}
public function isReachedBy(int $amount): bool
{
if ($this->isUnlimited()) {
return false;
}
return $amount >= $this->limit;
}
}

67
src/Context.php Normal file
View File

@ -0,0 +1,67 @@
<?php
namespace Toalett\Multiprocessing;
use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use React\EventLoop\LoopInterface;
class Context implements EventEmitterInterface
{
public const GC_INTERVAL = 120;
public const CLEANUP_INTERVAL = 5;
use EventEmitterTrait;
private LoopInterface $eventLoop;
private ConcurrencyLimit $limit;
private Workers $workers;
public function __construct(LoopInterface $eventLoop, ConcurrencyLimit $limit, ?Workers $workers = null)
{
$this->eventLoop = $eventLoop;
$this->limit = $limit;
$this->workers = $workers ?? new Workers();
$this->eventLoop->futureTick(fn() => $this->emit('booted'));
$this->eventLoop->futureTick(fn() => gc_enable());
$this->eventLoop->addPeriodicTimer(self::CLEANUP_INTERVAL, fn() => $this->workers->cleanup());
$this->eventLoop->addPeriodicTimer(self::GC_INTERVAL, fn() => gc_collect_cycles());
$this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid]));
$this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid]));
$this->workers->on('worker_stopped', fn() => $this->emitIf($this->workers->empty(), 'no_workers_remaining'));
}
public function submit(callable $task, ...$args): void
{
$this->eventLoop->futureTick(function () use ($task, $args) {
if ($this->limit->isReachedBy(count($this->workers))) {
$this->emit('congestion');
$this->workers->awaitCongestionRelief();
$this->emit('congestion_relieved');
}
$this->workers->createWorkerFor($task, $args);
});
}
public function run(): void
{
$this->eventLoop->run();
}
public function stop(): void
{
$this->eventLoop->futureTick(function() {
$this->eventLoop->stop();
$this->workers->stop();
$this->emit('stopped');
});
}
public function emitIf(bool $condition, string $event, ...$args): void
{
if ($condition) {
$this->emit($event, $args);
}
}
}

39
src/ContextBuilder.php Normal file
View File

@ -0,0 +1,39 @@
<?php
namespace Toalett\Multiprocessing;
use React\EventLoop\Factory;
use React\EventLoop\LoopInterface;
class ContextBuilder
{
private ?LoopInterface $loop = null;
private ?ConcurrencyLimit $limit = null;
public static function create(): self
{
return new self();
}
public function withEventLoop(LoopInterface $loop): self
{
$instance = clone $this;
$instance->loop = $loop;
return $instance;
}
public function withLimit(ConcurrencyLimit $limit): self
{
$instance = clone $this;
$instance->limit = $limit;
return $instance;
}
public function build(): Context
{
return new Context(
$this->loop ?? Factory::create(),
$this->limit ?? ConcurrencyLimit::unlimited()
);
}
}

View File

@ -0,0 +1,11 @@
<?php
namespace Toalett\Multiprocessing\Exception;
class InvalidArgumentException extends \InvalidArgumentException
{
public function __construct(string $expected, string $actual)
{
parent::__construct(sprintf('Expected %s, got \'%s\'', $expected, $actual));
}
}

View File

@ -0,0 +1,23 @@
<?php
namespace Toalett\Multiprocessing\Exception;
use RuntimeException;
class ProcessControlException extends RuntimeException
{
private function __construct(string $functionName)
{
parent::__construct(sprintf('call to \'%s\' failed', $functionName));
}
public static function forkFailed(): self
{
return new self('pcntl_fork');
}
public static function waitFailed(): self
{
return new self('pcntl_wait');
}
}

View File

@ -0,0 +1,88 @@
<?php
namespace Toalett\Multiprocessing\Tests;
use PHPUnit\Framework\TestCase;
use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\Exception\InvalidArgumentException;
use Toalett\Multiprocessing\Tests\Tools\PropertyInspector;
class ConcurrencyLimitTest extends TestCase
{
use PropertyInspector;
public function testItDoesNotAllowZeroAsLimit(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Expected -1 or positive integer, got \'0\'');
new ConcurrencyLimit(0);
}
public function testItDoesAllowNegativeOneAsLimit(): void
{
$limit = new ConcurrencyLimit(-1);
self::assertTrue($limit->isUnlimited());
}
/**
* @param int $negativeNumber
* @dataProvider negativeValueProvider
*/
public function testItDoesNotAllowAnyOtherNegativeNumberAsLimitExceptNegativeOne(int $negativeNumber): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage(sprintf('Expected -1 or positive integer, got \'%s\'', $negativeNumber));
new ConcurrencyLimit($negativeNumber);
}
public function testItCanBeMadeUnlimited(): void
{
$limit = ConcurrencyLimit::unlimited();
self::assertTrue($limit->isUnlimited());
}
public function testItCanLimitToASingleWorker(): void
{
$limit = ConcurrencyLimit::singleWorker();
self::assertFalse($limit->isUnlimited());
self::assertEquals(1, $this->getProperty($limit, 'limit'));
}
public function testAnUnlimitedLimitCanNeverBeReached(): void
{
$limit = ConcurrencyLimit::unlimited();
self::assertFalse($limit->isReachedBy(PHP_INT_MIN));
self::assertFalse($limit->isReachedBy(0));
self::assertFalse($limit->isReachedBy(PHP_INT_MAX));
}
public function testABoundLimitCanBeReached(): void
{
$three = new ConcurrencyLimit(3);
$seven = new ConcurrencyLimit(7);
self::assertTrue($three->isReachedBy(3));
self::assertFalse($three->isReachedBy(2));
self::assertFalse($three->isReachedBy(1));
self::assertTrue($seven->isReachedBy(7));
self::assertTrue($seven->isReachedBy(120));
self::assertFalse($seven->isReachedBy(-2));
}
public function negativeValueProvider(): array
{
return [
'-2' => [-2],
'-3' => [-3],
'-10000' => [-10000],
'PHP_INT_MIN' => [PHP_INT_MIN],
];
}
}

View File

@ -0,0 +1,68 @@
<?php
namespace Toalett\Multiprocessing\Tests;
use PHPUnit\Framework\TestCase;
use React\EventLoop\LoopInterface;
use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Tests\Tools\PropertyInspector;
class ContextBuilderTest extends TestCase
{
use PropertyInspector;
public function testItIsImmutable(): void
{
$builder = ContextBuilder::create();
$eventLoop = $this->createMock(LoopInterface::class);
$limit = $this->createMock(ConcurrencyLimit::class);
self::assertNotSame($builder->withEventLoop($eventLoop), $builder);
self::assertNotSame($builder->withLimit($limit), $builder);
}
public function testItGivesBackANewContextEachTimeBuildIsInvoked(): void
{
$builder = ContextBuilder::create();
self::assertNotSame($builder->build(), $builder->build());
}
public function testItCreatesANewContextWithUnlimitedConcurrencyWhenSupplyingNoArguments(): void
{
$builder = ContextBuilder::create();
$context = $builder->build();
self::assertIsObject($context);
self::assertInstanceOf(LoopInterface::class, $this->getProperty($context, 'eventLoop'));
/** @var ConcurrencyLimit|null $limit */
$limit = $this->getProperty($context, 'limit');
self::assertIsObject($limit);
self::assertInstanceOf(ConcurrencyLimit::class, $limit);
self::assertTrue($limit->isUnlimited());
}
public function testWhenSuppliedWithACustomEventLoopItUsesThatEventLoop(): void
{
$builder = ContextBuilder::create();
$eventLoop = $this->createMock(LoopInterface::class);
$context = $builder->withEventLoop($eventLoop)->build();
$usedEventLoop = $this->getProperty($context, 'eventLoop');
self::assertSame($eventLoop, $usedEventLoop);
}
public function testWhenSuppliedWithACustomConcurrencyLimitItUsesThatLimit(): void
{
$builder = ContextBuilder::create();
$limit = $this->createMock(ConcurrencyLimit::class);
$context = $builder->withLimit($limit)->build();
$usedLimit = $this->getProperty($context, 'limit');
self::assertSame($limit, $usedLimit);
}
}

98
src/Tests/ContextTest.php Normal file
View File

@ -0,0 +1,98 @@
<?php
namespace Toalett\Multiprocessing\Tests;
use PHPUnit\Framework\TestCase;
use React\EventLoop\Factory;
use React\EventLoop\LoopInterface;
use Toalett\Multiprocessing\ConcurrencyLimit;
use Toalett\Multiprocessing\Context;
use Toalett\Multiprocessing\Workers;
class ContextTest extends TestCase
{
public function testItEmitsAnEventWhenBooted(): void
{
$limit = $this->createMock(ConcurrencyLimit::class);
$loop = Factory::create();
$context = new Context($loop, $limit);
$loop->futureTick(fn() => $context->stop());
$bootEventHasTakenPlace = false;
$context->on('booted', function () use (&$bootEventHasTakenPlace) {
$bootEventHasTakenPlace = true;
});
self::assertFalse($bootEventHasTakenPlace);
$context->run();
self::assertTrue($bootEventHasTakenPlace);
}
public function testItEmitsEventsWhenCongestionOccursAndIsRelieved(): void
{
$loop = Factory::create();
$limit = $this->createMock(ConcurrencyLimit::class);
$context = new Context($loop, $limit);
$limit->method('isReachedBy')->willReturn(true); // trigger congestion
$loop->futureTick(fn() => $context->stop());
$congestionEventHasTakenPlace = false;
$congestionRelievedEventHasTakenPlace = false;
$context->on('congestion', function () use (&$congestionEventHasTakenPlace) {
$congestionEventHasTakenPlace = true;
});
$context->on('congestion_relieved', function () use (&$congestionRelievedEventHasTakenPlace) {
$congestionRelievedEventHasTakenPlace = true;
});
self::assertFalse($congestionEventHasTakenPlace);
self::assertFalse($congestionRelievedEventHasTakenPlace);
$context->submit(fn() => []);
$context->run();
self::assertTrue($congestionEventHasTakenPlace);
self::assertTrue($congestionRelievedEventHasTakenPlace);
}
public function testItCreatesAWorkerForASubmittedTask(): void
{
$limit = $this->createMock(ConcurrencyLimit::class);
$loop = $this->createMock(LoopInterface::class);
$context = new Context($loop, $limit);
$limit->method('isReachedBy')->willReturn(false);
$loop->expects(self::once())->method('futureTick')->withConsecutive(
[fn() => []],
);
$context->submit(fn() => []);
}
public function testItRegistersMaintenanceCallbacksOnTheEventLoop(): void
{
$loop = $this->createMock(LoopInterface::class);
$limit = $this->createMock(ConcurrencyLimit::class);
$loop->expects(self::exactly(2))->method('addPeriodicTimer')->withConsecutive(
[Context::CLEANUP_INTERVAL, fn() => []],
[Context::GC_INTERVAL, fn() => []]
);
new Context($loop, $limit);
}
public function testItProxiesWorkersEventsToSelf(): void
{
$loop = $this->createMock(LoopInterface::class);
$limit = $this->createMock(ConcurrencyLimit::class);
$workers = $this->createMock(Workers::class);
$workers->expects(self::atLeast(2))->method('on')->withConsecutive(
['worker_started', fn() => []],
['worker_stopped', fn() => []]
);
new Context($loop, $limit, $workers);
}
}

View File

@ -0,0 +1,16 @@
<?php
namespace Toalett\Multiprocessing\Tests\Tools;
use ReflectionObject;
trait PropertyInspector
{
protected function getProperty(object $object, string $propertyName)
{
$reflector = new ReflectionObject($object);
$property = $reflector->getProperty($propertyName);
$property->setAccessible(true);
return $property->getValue($object);
}
}

41
src/Tests/WorkersTest.php Normal file
View File

@ -0,0 +1,41 @@
<?php
namespace Toalett\Multiprocessing\Tests;
use ReflectionObject;
use Toalett\Multiprocessing\Workers;
use PHPUnit\Framework\TestCase;
class WorkersTest extends TestCase
{
public function testItEmitsAnEventWhenAWorkerIsStarted(): void
{
$workers = new Workers();
$workerStartedEventHasTakenPlace = false;
$workers->on('worker_started', function() use (&$workerStartedEventHasTakenPlace) {
$workerStartedEventHasTakenPlace = true;
});
self::assertFalse($workerStartedEventHasTakenPlace);
$workers->createWorkerFor(fn() => exit(0), []);
self::assertTrue($workerStartedEventHasTakenPlace);
}
public function testItEmitsAnEventWhenAWorkerIsRemoved(): void
{
$workers = new Workers();
$reflector = new ReflectionObject($workers);
$method = $reflector->getMethod('remove');
$method->setAccessible(true);
$workerStoppedEventHasTakenPlace = false;
$workers->on('worker_stopped', function() use (&$workerStoppedEventHasTakenPlace) {
$workerStoppedEventHasTakenPlace = true;
});
self::assertFalse($workerStoppedEventHasTakenPlace);
$method->invoke($workers, 0);
self::assertTrue($workerStoppedEventHasTakenPlace);
}
}

91
src/Workers.php Normal file
View File

@ -0,0 +1,91 @@
<?php
namespace Toalett\Multiprocessing;
use Countable;
use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use Throwable;
use Toalett\Multiprocessing\Exception\ProcessControlException;
class Workers implements Countable, EventEmitterInterface
{
use EventEmitterTrait;
/** @var int[] */
private array $workers = [];
public function count(): int
{
return count($this->workers);
}
public function empty(): bool
{
return count($this->workers) === 0;
}
public function createWorkerFor(callable $task, array $args): void
{
$pid = $this->forkWorker($task, $args);
$this->workers[$pid] = $pid;
$this->emit('worker_started', [$pid]);
}
public function cleanup(): void
{
while (true === $this->wait(WNOHANG)) ;
}
public function awaitCongestionRelief(): void
{
$this->wait();
}
private function remove(int $pid): void
{
unset($this->workers[$pid]);
$this->emit('worker_stopped', [$pid]);
}
private function forkWorker(callable $task, array $args): int
{
$pid = pcntl_fork();
if ($pid === -1) {
throw ProcessControlException::forkFailed();
}
if ($pid === 0) {
try {
call_user_func_array($task, $args);
} catch (Throwable $t) {
fwrite(STDERR, $t->getMessage());
exit(1);
}
exit(0);
}
return $pid;
}
/**
* @param int $options
* @return bool Whether a process was caught
*/
private function wait(int $options = 0): bool
{
$pid = pcntl_wait($status, $options);
if ($pid > 0) {
$this->remove($pid);
return true;
}
// We ignore errors ($pid < 0). This method is called periodically, even if there is
// no child available. pcntl_wait() will return -1. This is expected behavior.
return false;
}
public function stop(): void
{
while (true === $this->wait());
}
}