Multiprocessing library for PHP7.4+
Go to file
2020-12-14 12:56:41 +01:00
bin Update README.md, consistent use of spaces instead of tabs, better examples 2020-12-12 13:05:48 +01:00
src Add CallableProvider trait for tests 2020-12-14 12:56:41 +01:00
.gitignore Implement Unit Tests, add Readme, add examples, stronger implementation 2020-12-11 01:25:38 +01:00
composer.json Extracted Process Control to separate interface to allow better tests 2020-12-11 13:23:57 +01:00
composer.lock Extracted Process Control to separate interface to allow better tests 2020-12-11 13:23:57 +01:00
phpunit.xml.dist Extract components and add more tests 2020-12-12 02:11:05 +01:00
README.md Update README.md, consistent use of spaces instead of tabs, better examples 2020-12-12 13:05:48 +01:00

🚽 Toalett

Welcome to Toalett, a humble initiative based around the idea that all software is 💩.
Toalett is the Norwegian word for toilet. It feels fancier than plain "toilet".

Why toalett/multiprocessing?

Multiprocessing is a technique that is often used in PHP (cli) applications to execute tasks asynchronously. Due to the lack of native multithreading in PHP, developers have to rely on good old multiprocessing to do this.

We often see code that's written in a quick and dirty way to accomplish this task, with calls to pcntl_fork() hidden somewhere, leading to ugly implementations.

Toalett has nothing against quick and dirty PHP code. Toalett lives it. It breathes it. But since multiprocessing so common, it might be nice to use this library.

Okay, cool, but... How?

toalett/multiprocessing comes with the handy-dandy ContextBuilder class which is used to build a Context. A Context is the central component of this library. It schedules tasks to the Workers. Workers are a representation of child processes that are working on a task.

The Context uses a ReactPHP EventLoop internally and emits events using the simple (but elegant) Evenement library.

Events

The context emits events when something of interest happens. You can react to these events by calling:
$context->on('name_of_event', fn() => ...);.

These are the events emitted by the context:

  1. booted
  2. worker_started
  3. worker_stopped
  4. congestion
  5. congestion_relieved
  6. no_workers_remaining
  7. stopped

1. booted

This event is emitted after $context->run() is called. This is the very first event dispatched by the context. It is dispatched as soon as the event loop has started.

2. worker_started

This event is emitted when a worker has been started (the process has been forked). The PID of the child process is supplied as an argument to a listener.

3. worker_stopped

This event is emitted when a worker has been stopped (child process has stopped). The PID of the child process is supplied as an argument to a listener.

4. congestion

This event is emitted when the imposed concurrency limit is reached. This happens when (for example) the concurrency is set to at most 2 child processes, and a third task gets submitted while 2 tasks are already running. The system naively waits for a child to stop before starting another worker.

5. congestion_relieved

This event is emitted when congestion is relieved. This means that a child has stopped, allowing for the execution of a new task.

6. no_workers_remaining

This event is emitted when there are no workers left running. This usually means there is no more work to do. It's possible to automatically stop the context when this event occurs. This is shown in the first and last example.

7. stopped

The context can be stopped by calling $context->stop(). When the workers and the event loop are succesfully stopped, the context emits a stopped event.

Examples

For most developers, the quickest way to learn something is by looking at examples. Three examples are provided.

There is a simple example, which demonstrates event emission with the creation of 50 jobs. A counter is incremented every time a job stops. When all jobs are done, the context is stopped.

The cleanup interval is the interval at which the context checks for dead worker processes and reads their exit codes. It defaults to 5 seconds and is in some examples explicitely set to a low value to improve example responsiveness.

Counting stopped workers using events

<?php

use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;

const NUM_JOBS = 50;

$context = ContextBuilder::create()
    ->withCleanupInterval(Interval::seconds(0.5))
    ->build();

$counter = new Counter();
$context->on('worker_stopped', [$counter, 'increment']);
$context->on('no_workers_remaining', [$context, 'stop']);
$context->on('stopped', fn() => printf(" %d\n", $counter->value));

for ($i = 0; $i < NUM_JOBS; $i++) {
    $context->submit(fn() => sleep(2));
    print('.');
}

$context->run();

Triggering congestion with 4 workers

This example is a bit more elaborate than the previous one. It serves to demonstrate congestion and how it is handled by the context: the context simply blocks all execution until a worker stops and a spot becomes available.

Watch for the occurence of 'C' in the output. This denotes congestion: a worker could not be started.

<?php

use React\EventLoop\Factory;
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Concurrency;

$loop = Factory::create();
$context = ContextBuilder::create()
    ->withEventLoop($loop)
    ->withConcurrency(Concurrency::atMost(4))
    ->build();

$context->on('booted', fn() => print("🚽 toalett context booted\n"));
$context->on('congestion', fn() => print('C'));
$context->on('congestion_relieved', fn() => print('R'));
$context->on('worker_started', fn() => print('+'));
$context->on('worker_stopped', fn() => print('-'));

// A job is submitted to the context every second.
// The job sleeps for a random amount of seconds (0 - 10).
$loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10)));

print("Press CTRL+C to stop.\n");
$context->run();

Single worker with a Job class

Since a task is really just a Closure, it's also possible to submit an object with an implementation of the __invoke() magic method.

In this example, execution is limited to a single worker, and jobs are instances of the Job class.

<?php

use Toalett\Multiprocessing\Concurrency;
use Toalett\Multiprocessing\ContextBuilder;
use Toalett\Multiprocessing\Task\Interval;

$context = ContextBuilder::create()
    ->withConcurrency(Concurrency::singleWorker())
    ->withCleanupInterval(Interval::seconds(0.2))
    ->build();

for ($i = 0; $i < 3; $i++) {
    $title = md5(mt_rand());
    $context->submit(new Job($title));
}

$context->on('no_workers_remaining', [$context, 'stop']);
$context->run();

Tests

Tests can be found in the src/Tests/ directory.