diff --git a/README.md b/README.md index ae37e99..1dd7c0b 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Welcome to Toalett, a humble initiative based around the idea that all software Toalett is the Norwegian word for toilet. It feels fancier than plain "toilet". ## Why `toalett/multiprocessing`? -[Multiprocessing](https://nl.wikipedia.org/wiki/Multiprocessing) is a technique that is often used in PHP (CLI-)applications to execute tasks asynchronously. +[Multiprocessing](https://nl.wikipedia.org/wiki/Multiprocessing) is a technique that is often used in PHP (cli) applications to execute tasks asynchronously. Due to the lack of native [multithreading](https://en.wikipedia.org/wiki/Multithreading_(computer_architecture)) in PHP, developers have to rely on good old multiprocessing to do this. @@ -22,6 +22,56 @@ Workers are a representation of child processes that are working on a task. The Context uses a [ReactPHP EventLoop](https://reactphp.org/event-loop/) internally and emits events using the simple (but elegant) [Evenement](https://github.com/igorw/Evenement) library. +## Events + +The context emits events when something of interest happens. +You can react to these events by calling: +`$context->on('name_of_event', fn() => ...);`. + +These are the events emitted by the context: + +1. `booted` +2. `worker_started` +3. `worker_stopped` +4. `congestion` +5. `congestion_relieved` +6. `no_workers_remaining` +7. `stopped` + +#### 1. `booted` +This event is emitted after `$context->run()` is called. +This is the very first event dispatched by the context. +It is dispatched as soon as the event loop has started. + +#### 2. `worker_started` +This event is emitted when a worker has been started (the process has been forked). +The PID of the child process is supplied as an argument to a listener. + +#### 3. `worker_stopped` +This event is emitted when a worker has been stopped (child process has stopped). +The PID of the child process is supplied as an argument to a listener. + +#### 4. `congestion` +This event is emitted when the imposed concurrency limit is reached. +This happens when (for example) the concurrency is set to at most 2 child processes, +and a third task gets submitted while 2 tasks are already running. +The system naively waits for a child to stop before starting another worker. + +#### 5. `congestion_relieved` +This event is emitted when congestion is relieved. +This means that a child has stopped, allowing for the execution of a new task. + +#### 6. `no_workers_remaining` +This event is emitted when there are no workers left running. +This usually means there is no more work to do. +It's possible to automatically stop the context when this event occurs. +This is shown in the first and last example. + +#### 7. `stopped` +The context can be stopped by calling `$context->stop()`. +When the workers and the event loop are succesfully stopped, the context +emits a `stopped` event. + ## Examples For most developers, the quickest way to learn something is by looking at examples. Three examples are provided. @@ -30,181 +80,98 @@ There is a simple example, which demonstrates event emission with the creation o A counter is incremented every time a job stops. When all jobs are done, the context is stopped. -### [Simple example](bin/simple_example.php) +The cleanup interval is the interval at which the context checks for dead +worker processes and reads their exit codes. +It defaults to 5 seconds and is in some examples explicitely set to a low +value to improve example responsiveness. + +### [Counting stopped workers using events](bin/counting_stopped_workers.php) ```php value++; - } -}; - -// Create a context (defaults to unlimited child processes). -// The cleanup interval is the interval at dead processes -// will be read. For this example it's kept low. -// The default value is 5 seconds. $context = ContextBuilder::create() - ->withCleanupInterval(Interval::seconds(0.5)) - ->build(); + ->withCleanupInterval(Interval::seconds(0.5)) + ->build(); +$counter = new Counter(); $context->on('worker_stopped', [$counter, 'increment']); $context->on('no_workers_remaining', [$context, 'stop']); -$context->on('stopped', fn() => printf("\nJobs completed: %d\n", $counter->value)); +$context->on('stopped', fn() => printf(" %d\n", $counter->value)); -// You can submit jobs before the context is running. They will be executed -// in the order in which they are submitted to the context. -// Each job (thus child process) will be sleeping for 3 seconds. for ($i = 0; $i < NUM_JOBS; $i++) { - $context->submit(fn() => sleep(3)); - print('.'); + $context->submit(fn() => sleep(2)); + print('.'); } $context->run(); ``` -### [More elaborate example](bin/more_elaborate_example.php) +### [Triggering congestion with 4 workers](bin/triggering_congestion.php) This example is a bit more elaborate than the previous one. It serves to demonstrate congestion and how it is handled by the context: the context simply blocks all execution until a worker stops and a spot becomes available. -This example shows the usage of events. +Watch for the occurence of 'C' in the output. +This denotes congestion: a worker could not be started. ```php withEventLoop($loop) - ->withLimit(ConcurrencyLimit::atMost(4)) - ->build(); + ->withEventLoop($loop) + ->withConcurrency(Concurrency::atMost(4)) + ->build(); -$context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n")); +$context->on('booted', fn() => print("🚽 toalett context booted\n")); $context->on('congestion', fn() => print('C')); $context->on('congestion_relieved', fn() => print('R')); $context->on('worker_started', fn() => print('+')); $context->on('worker_stopped', fn() => print('-')); -// Submit a fake job every second +// A job is submitted to the context every second. +// The job sleeps for a random amount of seconds (0 - 10). $loop->addPeriodicTimer(1, fn() => $context->submit(fn(int $s) => sleep($s), random_int(0, 10))); print("Press CTRL+C to stop.\n"); $context->run(); - ``` -### [Example with a Job class](bin/example_with_job_class.php) -Since the task is defined by a `callable` supplied with arguments, it's also possible to -define a class that implements the magic `__invoke()` method and submit objects of this -class to the Context. Objects implementing the `__invoke()` method can be treated as -closures. They may accept zero or more arguments. +### [Single worker with a Job class](bin/single_worker_with_job_class.php) +Since a task is really just a `Closure`, it's also possible to submit an object +with an implementation of the `__invoke()` magic method. + +In this example, execution is limited to a single worker, and jobs are +instances of the `Job` class. -This idea is demonstrated here, while execution is limited to a single worker. ```php title = $title; - } - - public function __invoke() - { - cli_set_process_title("php {$this->title}"); - print("+ {$this->title}"); - sleep(1); - print("\r {$this->title}\n"); - } -} - -$limit = ConcurrencyLimit::singleWorker(); $context = ContextBuilder::create() - ->withLimit(ConcurrencyLimit::singleWorker()) - ->withCleanupInterval(Interval::seconds(0.2)) - ->build(); + ->withConcurrency(Concurrency::singleWorker()) + ->withCleanupInterval(Interval::seconds(0.2)) + ->build(); for ($i = 0; $i < 3; $i++) { - $title = md5(mt_rand()); - $context->submit(new Job($title)); + $title = md5(mt_rand()); + $context->submit(new Job($title)); } $context->on('no_workers_remaining', [$context, 'stop']); $context->run(); ``` -## Events - -1. `booted` -1. `worker_started` -1. `worker_stopped` -1. `congestion` -1. `congestion_relieved` -1. `no_workers_remaining` -1. `stopped` - -These events are emitted by the context. -They can be subscribed to by calling `$context->on('...', fn() => ...);`. - -#### `booted` -This event is emitted when `$context->run()` is called. -This is the very first event dispatched by the context. - -#### `worker_started` -This event is emitted when a worker has been started (the process has been forked). -The PID of the child process is supplied as an argument to a listener. - -#### `worker_stopped` -This event is emitted when a worker has been stopped (child process has stopped). -The PID of the child process is supplied as an argument to a listener. - -#### `congestion` -This event is emitted when the imposed concurrency limit is reached, for example, -when the limit is set to at most 2 child processes, and a third task gets submitted -while there are already two tasks running. -The system naively waits for a child to stop before starting another worker. - -#### `congestion_relieved` -This event is emitted in case the congestion explained above is relieved. -This means that a child has stopped, allowing for the execution of a new task. - -#### `no_workers_remaining` -This event is emitted when there are no workers left running. -This usually means there is no more work to do. -It's possible to automatically stop the context when this event occurs. -This is shown in the first and last example. - -#### `stopped` -This event is emitted when `$context->stop()` is called and the eventloop has -succesfully been stopped. - -## Why no shared memory? -Shared memory in PHP is hard to manage and quickly becomes a mess. Don't ask. - -Feel free to add it yourself though. 😉 +## Tests +Tests can be found in the [src/Tests/](src/Tests) directory. diff --git a/bin/classes/Counter.php b/bin/classes/Counter.php new file mode 100644 index 0000000..1cd560d --- /dev/null +++ b/bin/classes/Counter.php @@ -0,0 +1,11 @@ +value++; + } +} diff --git a/bin/classes/Job.php b/bin/classes/Job.php new file mode 100644 index 0000000..709c0a3 --- /dev/null +++ b/bin/classes/Job.php @@ -0,0 +1,19 @@ +title = $title; + } + + public function __invoke() + { + cli_set_process_title("php {$this->title}"); + print("* {$this->title}"); + sleep(1); + print("\r {$this->title}\n"); + } +} diff --git a/bin/simple_example.php b/bin/counting_stopped_workers.php similarity index 55% rename from bin/simple_example.php rename to bin/counting_stopped_workers.php index ceda85c..faf60bc 100644 --- a/bin/simple_example.php +++ b/bin/counting_stopped_workers.php @@ -4,29 +4,21 @@ use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\Task\Interval; require_once __DIR__ . '/../vendor/autoload.php'; - +require_once __DIR__ . '/classes/Counter.php'; const NUM_JOBS = 50; -$counter = new class { - public int $value = 0; - - public function increment(): void - { - $this->value++; - } -}; - $context = ContextBuilder::create() - ->withCleanupInterval(Interval::seconds(0.5)) - ->build(); + ->withCleanupInterval(Interval::seconds(0.5)) + ->build(); +$counter = new Counter(); $context->on('worker_stopped', [$counter, 'increment']); $context->on('no_workers_remaining', [$context, 'stop']); -$context->on('stopped', fn() => printf("\nJobs completed: %d\n", $counter->value)); +$context->on('stopped', fn() => printf(" %d\n", $counter->value)); for ($i = 0; $i < NUM_JOBS; $i++) { - $context->submit(fn() => sleep(3)); - print('.'); + $context->submit(fn() => sleep(2)); + print('.'); } $context->run(); diff --git a/bin/example_with_job_class.php b/bin/example_with_job_class.php deleted file mode 100644 index 68b2cc9..0000000 --- a/bin/example_with_job_class.php +++ /dev/null @@ -1,39 +0,0 @@ -title = $title; - } - - public function __invoke() - { - cli_set_process_title("php {$this->title}"); - print("+ {$this->title}"); - sleep(1); - print("\r {$this->title}\n"); - } -} - -$limit = ConcurrencyLimit::singleWorker(); -$context = ContextBuilder::create() - ->withLimit(ConcurrencyLimit::singleWorker()) - ->withCleanupInterval(Interval::seconds(0.2)) - ->build(); - -for ($i = 0; $i < 3; $i++) { - $title = md5(mt_rand()); - $context->submit(new Job($title)); -} - -$context->on('no_workers_remaining', [$context, 'stop']); -$context->run(); diff --git a/bin/single_worker_with_job_class.php b/bin/single_worker_with_job_class.php new file mode 100644 index 0000000..41c74c5 --- /dev/null +++ b/bin/single_worker_with_job_class.php @@ -0,0 +1,21 @@ +withConcurrency(Concurrency::singleWorker()) + ->withCleanupInterval(Interval::seconds(0.2)) + ->build(); + +for ($i = 0; $i < 3; $i++) { + $title = md5(mt_rand()); + $context->submit(new Job($title)); +} + +$context->on('no_workers_remaining', [$context, 'stop']); +$context->run(); diff --git a/bin/more_elaborate_example.php b/bin/triggering_congestion.php similarity index 73% rename from bin/more_elaborate_example.php rename to bin/triggering_congestion.php index 2c50055..2a4e87c 100644 --- a/bin/more_elaborate_example.php +++ b/bin/triggering_congestion.php @@ -1,16 +1,16 @@ withEventLoop($loop) - ->withLimit(ConcurrencyLimit::atMost(4)) - ->build(); + ->withEventLoop($loop) + ->withConcurrency(Concurrency::atMost(4)) + ->build(); $context->on('booted', fn() => print("🚽 Toalett Multiprocessing Context\n")); $context->on('congestion', fn() => print('C')); diff --git a/src/ConcurrencyLimit.php b/src/Concurrency.php similarity index 94% rename from src/ConcurrencyLimit.php rename to src/Concurrency.php index 13b83ca..e2a7908 100644 --- a/src/ConcurrencyLimit.php +++ b/src/Concurrency.php @@ -4,7 +4,7 @@ namespace Toalett\Multiprocessing; use Toalett\Multiprocessing\Exception\InvalidArgumentException; -class ConcurrencyLimit +class Concurrency { private const VALUE_UNLIMITED = -1; private int $limit; @@ -24,7 +24,7 @@ class ConcurrencyLimit public static function atMost(int $limit): self { - return new self($limit); + return new self($limit); } public static function unlimited(): self diff --git a/src/Context.php b/src/Context.php index 52d27e6..4e6823f 100644 --- a/src/Context.php +++ b/src/Context.php @@ -11,75 +11,70 @@ use Toalett\Multiprocessing\Task\Tasks; class Context implements EventEmitterInterface { - public const INTERVAL_GC = 120; - public const INTERVAL_CLEANUP = 5; - use EventEmitterTrait; + public const INTERVAL_GC = 120; + public const INTERVAL_CLEANUP = 5; + use EventEmitterTrait; - private LoopInterface $eventLoop; - private ConcurrencyLimit $limit; - private Workers $workers; - private Tasks $maintenanceTasks; + private LoopInterface $eventLoop; + private Concurrency $concurrency; + private Workers $workers; + private Tasks $maintenanceTasks; - public function __construct( - LoopInterface $eventLoop, - ConcurrencyLimit $limit, - ?Workers $workers = null, - ?Interval $cleanupInterval = null, - ?Interval $garbageCollectionInterval = null - ) - { - $this->eventLoop = $eventLoop; - $this->limit = $limit; - $this->workers = $workers ?? new Workers(); - $this->setupWorkerEventForwarding(); - $this->setupMaintenanceTasks($cleanupInterval, $garbageCollectionInterval); - } + public function __construct( + LoopInterface $eventLoop, + Concurrency $concurrency, + ?Workers $workers = null, + ?Interval $cleanupInterval = null + ) + { + $this->eventLoop = $eventLoop; + $this->concurrency = $concurrency; + $this->workers = $workers ?? new Workers(); + $this->setupWorkerEventForwarding(); + $this->setupMaintenanceTasks($cleanupInterval); + } - public function run(): void - { - $this->eventLoop->futureTick(fn() => $this->emit('booted')); - $this->eventLoop->futureTick(fn() => gc_enable()); - $this->maintenanceTasks->enable($this->eventLoop); - $this->eventLoop->run(); - } + public function run(): void + { + $this->eventLoop->futureTick(fn() => $this->emit('booted')); + $this->eventLoop->futureTick(fn() => gc_enable()); + $this->maintenanceTasks->enable($this->eventLoop); + $this->eventLoop->run(); + } - public function submit(callable $task, ...$args): void - { - $this->eventLoop->futureTick(function () use ($task, $args) { - if ($this->limit->isReachedBy(count($this->workers))) { - $this->emit('congestion'); - $this->workers->awaitCongestionRelief(); - $this->emit('congestion_relieved'); - } - $this->workers->createWorkerFor($task, $args); - }); - } + public function submit(callable $task, ...$args): void + { + $this->eventLoop->futureTick(function () use ($task, $args) { + if ($this->concurrency->isReachedBy(count($this->workers))) { + $this->emit('congestion'); + $this->workers->awaitCongestionRelief(); + $this->emit('congestion_relieved'); + } + $this->workers->createWorkerFor($task, $args); + }); + } - public function stop(): void - { - $this->maintenanceTasks->cancel(); - $this->workers->stop(); - $this->emit('stopped'); - } + public function stop(): void + { + $this->maintenanceTasks->cancel(); + $this->workers->stop(); + $this->emit('stopped'); + } - private function setupWorkerEventForwarding(): void - { - $this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid])); - $this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid])); - $this->workers->on('no_workers_remaining', fn() => $this->emit('no_workers_remaining')); - } + private function setupWorkerEventForwarding(): void + { + $this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid])); + $this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid])); + $this->workers->on('no_workers_remaining', fn() => $this->emit('no_workers_remaining')); + } - private function setupMaintenanceTasks(?Interval $cleanupInterval, ?Interval $garbageCollectionInterval): void - { - $this->maintenanceTasks = new Tasks( - new RepeatedTask( - $cleanupInterval ?? Interval::seconds(self::INTERVAL_CLEANUP), - fn() => $this->workers->cleanup() - ), - new RepeatedTask( - $garbageCollectionInterval ?? Interval::seconds(self::INTERVAL_GC), - fn() => gc_collect_cycles() - ) - ); - } + private function setupMaintenanceTasks(?Interval $cleanupInterval): void + { + $cleanupInterval = $cleanupInterval ?? Interval::seconds(self::INTERVAL_CLEANUP); + $gcInterval = Interval::seconds(self::INTERVAL_GC); + $this->maintenanceTasks = new Tasks( + new RepeatedTask($cleanupInterval, [$this->workers, 'cleanup']), + new RepeatedTask($gcInterval, 'gc_collect_cycles') + ); + } } diff --git a/src/ContextBuilder.php b/src/ContextBuilder.php index c6a1d4f..22cd78b 100644 --- a/src/ContextBuilder.php +++ b/src/ContextBuilder.php @@ -8,60 +8,51 @@ use Toalett\Multiprocessing\Task\Interval; class ContextBuilder { - private ?LoopInterface $loop = null; - private ?ConcurrencyLimit $limit = null; - private ?Workers $workers = null; - private ?Interval $garbageCollectionInterval = null; - private ?Interval $cleanupInterval = null; + private ?LoopInterface $loop = null; + private ?Concurrency $concurrency = null; + private ?Workers $workers = null; + private ?Interval $cleanupInterval = null; - public static function create(): self - { - return new self(); - } + public static function create(): self + { + return new self(); + } - public function withEventLoop(LoopInterface $loop): self - { - $instance = clone $this; - $instance->loop = $loop; - return $instance; - } + public function withEventLoop(LoopInterface $loop): self + { + $instance = clone $this; + $instance->loop = $loop; + return $instance; + } - public function withLimit(ConcurrencyLimit $limit): self - { - $instance = clone $this; - $instance->limit = $limit; - return $instance; - } + public function withConcurrency(Concurrency $concurrency): self + { + $instance = clone $this; + $instance->concurrency = $concurrency; + return $instance; + } - public function withWorkers(Workers $workers): self - { - $instance = clone $this; - $instance->workers = $workers; - return $instance; - } + public function withWorkers(Workers $workers): self + { + $instance = clone $this; + $instance->workers = $workers; + return $instance; + } - public function withGarbageCollectionInterval(Interval $interval): self - { - $instance = clone $this; - $instance->garbageCollectionInterval = $interval; - return $instance; - } + public function withCleanupInterval(Interval $interval): self + { + $instance = clone $this; + $instance->cleanupInterval = $interval; + return $instance; + } - public function withCleanupInterval(Interval $interval): self - { - $instance = clone $this; - $instance->cleanupInterval = $interval; - return $instance; - } - - public function build(): Context - { - return new Context( - $this->loop ?? Factory::create(), - $this->limit ?? ConcurrencyLimit::unlimited(), - $this->workers, - $this->cleanupInterval, - $this->garbageCollectionInterval - ); - } + public function build(): Context + { + return new Context( + $this->loop ?? Factory::create(), + $this->concurrency ?? Concurrency::unlimited(), + $this->workers, + $this->cleanupInterval + ); + } } diff --git a/src/ProcessControl/Fork.php b/src/ProcessControl/Fork.php index b953766..0f3b258 100644 --- a/src/ProcessControl/Fork.php +++ b/src/ProcessControl/Fork.php @@ -4,25 +4,25 @@ namespace Toalett\Multiprocessing\ProcessControl; class Fork { - public int $pid; + public int $pid; - public function __construct(int $pid) - { - $this->pid = $pid; - } + public function __construct(int $pid) + { + $this->pid = $pid; + } - public function failed(): bool - { - return $this->pid < 0; - } + public function failed(): bool + { + return $this->pid < 0; + } - public function isChild(): bool - { - return $this->pid === 0; - } + public function isChild(): bool + { + return $this->pid === 0; + } - public function isParent(): bool - { - return $this->pid !== 0; - } + public function isParent(): bool + { + return $this->pid !== 0; + } } diff --git a/src/ProcessControl/PCNTL.php b/src/ProcessControl/PCNTL.php index 8728c52..b74a2e4 100644 --- a/src/ProcessControl/PCNTL.php +++ b/src/ProcessControl/PCNTL.php @@ -4,15 +4,15 @@ namespace Toalett\Multiprocessing\ProcessControl; class PCNTL implements ProcessControl { - public function fork(): Fork - { - $pid = pcntl_fork(); - return new Fork($pid); - } + public function fork(): Fork + { + $pid = pcntl_fork(); + return new Fork($pid); + } - public function wait(int $options = 0): Wait - { - $pid = pcntl_wait($status, $options); - return new Wait($pid, $status); - } + public function wait(int $options = 0): Wait + { + $pid = pcntl_wait($status, $options); + return new Wait($pid, $status); + } } diff --git a/src/ProcessControl/ProcessControl.php b/src/ProcessControl/ProcessControl.php index 184b06f..cb95098 100644 --- a/src/ProcessControl/ProcessControl.php +++ b/src/ProcessControl/ProcessControl.php @@ -4,7 +4,7 @@ namespace Toalett\Multiprocessing\ProcessControl; interface ProcessControl { - public function fork(): Fork; + public function fork(): Fork; - public function wait(int $options = 0): Wait; + public function wait(int $options = 0): Wait; } diff --git a/src/ProcessControl/Wait.php b/src/ProcessControl/Wait.php index 0ea20e8..d9d1a32 100644 --- a/src/ProcessControl/Wait.php +++ b/src/ProcessControl/Wait.php @@ -4,24 +4,24 @@ namespace Toalett\Multiprocessing\ProcessControl; class Wait { - public const NO_HANG = WNOHANG; - public const UNTRACED = WUNTRACED; - public int $pid; - public int $status; + public const NO_HANG = WNOHANG; + public const UNTRACED = WUNTRACED; + public int $pid; + public int $status; - public function __construct(int $pid, int $status = 0) - { - $this->pid = $pid; - $this->status = $status; - } + public function __construct(int $pid, int $status = 0) + { + $this->pid = $pid; + $this->status = $status; + } - public function childStopped(): bool - { - return $this->pid > 0; - } + public function childStopped(): bool + { + return $this->pid > 0; + } - public function failed(): bool - { - return $this->pid < 0; - } + public function failed(): bool + { + return $this->pid < 0; + } } diff --git a/src/Task/Interval.php b/src/Task/Interval.php index cc19e0c..730ee74 100644 --- a/src/Task/Interval.php +++ b/src/Task/Interval.php @@ -6,38 +6,38 @@ use Toalett\Multiprocessing\Exception\InvalidArgumentException; class Interval { - private float $seconds; + private float $seconds; - private function __construct(float $seconds) - { - if ($seconds <= 0) { - throw new InvalidArgumentException('positive float', $seconds); - } - $this->seconds = $seconds; - } + private function __construct(float $seconds) + { + if ($seconds <= 0) { + throw new InvalidArgumentException('positive float', $seconds); + } + $this->seconds = $seconds; + } - public static function seconds(float $seconds): self - { - return new self($seconds); - } + public static function seconds(float $seconds): self + { + return new self($seconds); + } - public static function minutes(float $minutes): self - { - return new self(60.0 * $minutes); - } + public static function minutes(float $minutes): self + { + return new self(60.0 * $minutes); + } - public static function hours(float $hours): self - { - return new self(3600.0 * $hours); - } + public static function hours(float $hours): self + { + return new self(3600.0 * $hours); + } - public function asFloat(): float - { - return $this->seconds; - } + public function asFloat(): float + { + return $this->seconds; + } - public function asInt(): int - { - return (int)$this->seconds; - } + public function asInt(): int + { + return (int)$this->seconds; + } } diff --git a/src/Task/RepeatedTask.php b/src/Task/RepeatedTask.php index 367c867..1dd7f31 100644 --- a/src/Task/RepeatedTask.php +++ b/src/Task/RepeatedTask.php @@ -7,16 +7,16 @@ use React\EventLoop\TimerInterface; class RepeatedTask extends Task { - public Interval $interval; + public Interval $interval; - public function __construct(Interval $interval, callable $callable, ...$arguments) - { - $this->interval = $interval; - parent::__construct($callable, $arguments); - } + public function __construct(Interval $interval, callable $callable, ...$arguments) + { + $this->interval = $interval; + parent::__construct($callable, $arguments); + } - protected function generateTimer(LoopInterface $loop): TimerInterface - { - return $loop->addPeriodicTimer($this->interval->asFloat(), $this->createDeferredCall()); - } + protected function generateTimer(LoopInterface $loop): TimerInterface + { + return $loop->addPeriodicTimer($this->interval->asFloat(), $this->createDeferredCall()); + } } diff --git a/src/Task/Task.php b/src/Task/Task.php index 239c484..2a733fd 100644 --- a/src/Task/Task.php +++ b/src/Task/Task.php @@ -7,42 +7,42 @@ use React\EventLoop\TimerInterface; abstract class Task { - public $callable; - public array $arguments; - protected ?TimerInterface $timer = null; + public $callable; + public array $arguments; + protected ?TimerInterface $timer = null; - public function __construct(callable $callable, ...$arguments) - { - $this->callable = $callable; - $this->arguments = $arguments; - } + public function __construct(callable $callable, ...$arguments) + { + $this->callable = $callable; + $this->arguments = $arguments; + } - abstract protected function generateTimer(LoopInterface $loop): TimerInterface; + abstract protected function generateTimer(LoopInterface $loop): TimerInterface; - protected function createDeferredCall(): callable - { - return fn() => call_user_func_array( - $this->callable, - $this->arguments - ); - } + protected function createDeferredCall(): callable + { + return fn() => call_user_func_array( + $this->callable, + $this->arguments + ); + } - public function enable(LoopInterface $loop): void - { - if (!$this->isBound()) { - $this->timer = $this->generateTimer($loop); - } - } + public function enable(LoopInterface $loop): void + { + if (!$this->isBound()) { + $this->timer = $this->generateTimer($loop); + } + } - public function isBound(): bool - { - return !is_null($this->timer); - } + public function isBound(): bool + { + return !is_null($this->timer); + } - public function cancel(LoopInterface $loop): void - { - if ($this->isBound()) { - $loop->cancelTimer($this->timer); - } - } + public function cancel(LoopInterface $loop): void + { + if ($this->isBound()) { + $loop->cancelTimer($this->timer); + } + } } diff --git a/src/Task/Tasks.php b/src/Task/Tasks.php index ce6a08f..c21ff85 100644 --- a/src/Task/Tasks.php +++ b/src/Task/Tasks.php @@ -6,32 +6,32 @@ use React\EventLoop\LoopInterface; class Tasks { - /** @var Task[] */ - private array $tasks; - private ?LoopInterface $loop = null; + /** @var Task[] */ + private array $tasks; + private ?LoopInterface $loop = null; - public function __construct(Task ...$tasks) - { - $this->tasks = $tasks; - } + public function __construct(Task ...$tasks) + { + $this->tasks = $tasks; + } - public function enable(LoopInterface $loop): void - { - if (is_null($this->loop)) { - $this->loop = $loop; - foreach ($this->tasks as $task) { - $task->enable($this->loop); - } - } - } + public function enable(LoopInterface $loop): void + { + if (is_null($this->loop)) { + $this->loop = $loop; + foreach ($this->tasks as $task) { + $task->enable($this->loop); + } + } + } - public function cancel(): void - { - if (!is_null($this->loop)) { - foreach ($this->tasks as $task) { - $task->cancel($this->loop); - } - $this->loop = null; - } - } + public function cancel(): void + { + if (!is_null($this->loop)) { + foreach ($this->tasks as $task) { + $task->cancel($this->loop); + } + $this->loop = null; + } + } } diff --git a/src/Tests/ConcurrencyLimitTest.php b/src/Tests/ConcurrencyLimitTest.php deleted file mode 100644 index d336d6e..0000000 --- a/src/Tests/ConcurrencyLimitTest.php +++ /dev/null @@ -1,88 +0,0 @@ -expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('Expected -1 or positive integer, got \'0\''); - - ConcurrencyLimit::atMost(0); - } - - public function testItAcceptsNegativeOneAsUnlimited(): void - { - $limit = ConcurrencyLimit::atMost(-1); - - self::assertTrue($limit->isUnlimited()); - } - - /** - * @param int $negativeNumber - * @dataProvider negativeValueProvider - */ - public function testItDoesNotAllowAnyOtherNegativeValue(int $negativeNumber): void - { - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage(sprintf('Expected -1 or positive integer, got \'%s\'', $negativeNumber)); - - ConcurrencyLimit::atMost($negativeNumber); - } - - public function testTheLimitMayBeUnlimited(): void - { - $limit = ConcurrencyLimit::unlimited(); - - self::assertTrue($limit->isUnlimited()); - } - - public function testTheLimitMayBeASingleWorker(): void - { - $limit = ConcurrencyLimit::singleWorker(); - - self::assertFalse($limit->isUnlimited()); - self::assertEquals(1, $this->getProperty($limit, 'limit')); - } - - public function testAnUnlimitedLimitCanNeverBeReached(): void - { - $limit = ConcurrencyLimit::unlimited(); - - self::assertFalse($limit->isReachedBy(PHP_INT_MIN)); - self::assertFalse($limit->isReachedBy(0)); - self::assertFalse($limit->isReachedBy(PHP_INT_MAX)); - } - - public function testABoundLimitCanBeReached(): void - { - $three = ConcurrencyLimit::atMost(3); - $seven = ConcurrencyLimit::atMost(7); - - self::assertTrue($three->isReachedBy(3)); - self::assertFalse($three->isReachedBy(2)); - self::assertFalse($three->isReachedBy(1)); - - self::assertTrue($seven->isReachedBy(7)); - self::assertTrue($seven->isReachedBy(120)); - self::assertFalse($seven->isReachedBy(-2)); - } - - public function negativeValueProvider(): array - { - return [ - '-2' => [-2], - '-3' => [-3], - '-10000' => [-10000], - 'PHP_INT_MIN' => [PHP_INT_MIN], - ]; - } -} diff --git a/src/Tests/ConcurrencyTest.php b/src/Tests/ConcurrencyTest.php new file mode 100644 index 0000000..27fb2ff --- /dev/null +++ b/src/Tests/ConcurrencyTest.php @@ -0,0 +1,87 @@ +isUnlimited()); + } + + public function testItDoesNotAcceptZero(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Expected -1 or positive integer, got \'0\''); + + Concurrency::atMost(0); + } + + /** + * @param int $negativeNumber + * @dataProvider negativeValueProvider + */ + public function testItDoesNotAllowAnyOtherNegativeValue(int $negativeNumber): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage(sprintf('Expected -1 or positive integer, got \'%s\'', $negativeNumber)); + + Concurrency::atMost($negativeNumber); + } + + public function testTheLimitMayBeUnlimited(): void + { + $concurrency = Concurrency::unlimited(); + self::assertTrue($concurrency->isUnlimited()); + } + + public function testTheLimitMayBeASingleWorker(): void + { + $concurrency = Concurrency::singleWorker(); + + self::assertFalse($concurrency->isUnlimited()); + self::assertEquals(1, $this->getProperty($concurrency, 'limit')); + } + + public function testAnUnlimitedLimitCanNeverBeReached(): void + { + $concurrency = Concurrency::unlimited(); + + self::assertFalse($concurrency->isReachedBy(PHP_INT_MIN)); + self::assertFalse($concurrency->isReachedBy(0)); + self::assertFalse($concurrency->isReachedBy(PHP_INT_MAX)); + } + + public function testABoundLimitCanBeReached(): void + { + $three = Concurrency::atMost(3); + $seven = Concurrency::atMost(7); + + self::assertTrue($three->isReachedBy(3)); + self::assertFalse($three->isReachedBy(2)); + self::assertFalse($three->isReachedBy(1)); + + self::assertTrue($seven->isReachedBy(7)); + self::assertTrue($seven->isReachedBy(120)); + self::assertFalse($seven->isReachedBy(-2)); + } + + public function negativeValueProvider(): array + { + return [ + '-2' => [-2], + '-3' => [-3], + '-10000' => [-10000], + 'PHP_INT_MIN' => [PHP_INT_MIN], + ]; + } +} diff --git a/src/Tests/ContextBuilderTest.php b/src/Tests/ContextBuilderTest.php index 0d4fd5a..c888e9a 100644 --- a/src/Tests/ContextBuilderTest.php +++ b/src/Tests/ContextBuilderTest.php @@ -4,77 +4,77 @@ namespace Toalett\Multiprocessing\Tests; use PHPUnit\Framework\TestCase; use React\EventLoop\LoopInterface; -use Toalett\Multiprocessing\ConcurrencyLimit; +use Toalett\Multiprocessing\Concurrency; use Toalett\Multiprocessing\ContextBuilder; use Toalett\Multiprocessing\Tests\Tools\PropertyInspector; use Toalett\Multiprocessing\Workers; class ContextBuilderTest extends TestCase { - use PropertyInspector; + use PropertyInspector; - public function testItIsImmutable(): void - { - $builder = ContextBuilder::create(); - $eventLoop = $this->createMock(LoopInterface::class); - $limit = $this->createMock(ConcurrencyLimit::class); + public function testItIsImmutable(): void + { + $builder = ContextBuilder::create(); + $eventLoop = $this->createMock(LoopInterface::class); + $concurrency = $this->createMock(Concurrency::class); - self::assertNotSame($builder->withEventLoop($eventLoop), $builder); - self::assertNotSame($builder->withLimit($limit), $builder); - } + self::assertNotSame($builder->withEventLoop($eventLoop), $builder); + self::assertNotSame($builder->withConcurrency($concurrency), $builder); + } - public function testItBuildsANewContextEveryTime(): void - { - $builder = ContextBuilder::create(); + public function testItBuildsANewContextEveryTime(): void + { + $builder = ContextBuilder::create(); - self::assertNotSame($builder->build(), $builder->build()); - } + self::assertNotSame($builder->build(), $builder->build()); + } - public function testTheDefaultConcurrencyLimitIsUnlimited(): void - { - $builder = ContextBuilder::create(); + public function testTheDefaultConcurrencyIsUnlimited(): void + { + $builder = ContextBuilder::create(); - $context = $builder->build(); - self::assertIsObject($context); - self::assertInstanceOf(LoopInterface::class, $this->getProperty($context, 'eventLoop')); + $context = $builder->build(); + self::assertIsObject($context); + self::assertInstanceOf(LoopInterface::class, $this->getProperty($context, 'eventLoop')); - /** @var ConcurrencyLimit|null $limit */ - $limit = $this->getProperty($context, 'limit'); - self::assertIsObject($limit); - self::assertInstanceOf(ConcurrencyLimit::class, $limit); - self::assertTrue($limit->isUnlimited()); - } + /** @var Concurrency|null $concurrency */ + $concurrency = $this->getProperty($context, 'concurrency'); + self::assertIsObject($concurrency); + self::assertInstanceOf(Concurrency::class, $concurrency); + self::assertTrue($concurrency->isUnlimited()); + } - public function testWhenGivenAnEventLoopItUsesThatLoop(): void - { - $builder = ContextBuilder::create(); - $eventLoop = $this->createMock(LoopInterface::class); + public function testWhenGivenAnEventLoopItUsesThatLoop(): void + { + $builder = ContextBuilder::create(); + $eventLoop = $this->createMock(LoopInterface::class); - $context = $builder->withEventLoop($eventLoop)->build(); - $usedEventLoop = $this->getProperty($context, 'eventLoop'); + $context = $builder->withEventLoop($eventLoop)->build(); + $usedEventLoop = $this->getProperty($context, 'eventLoop'); - self::assertSame($eventLoop, $usedEventLoop); - } + self::assertSame($eventLoop, $usedEventLoop); + } - public function testWhenGivenAConcurrencyLimitItUsesThatLimit(): void - { - $builder = ContextBuilder::create(); - $limit = $this->createMock(ConcurrencyLimit::class); + public function testWhenGivenAConcurrencyItUsesThatConcurrency(): void + { + $builder = ContextBuilder::create(); + $concurrency = $this->createMock(Concurrency::class); - $context = $builder->withLimit($limit)->build(); - $usedLimit = $this->getProperty($context, 'limit'); + $context = $builder->withConcurrency($concurrency)->build(); + $usedConcurrency = $this->getProperty($context, 'concurrency'); - self::assertSame($limit, $usedLimit); - } + self::assertSame($concurrency, $usedConcurrency); + } - public function testWhenGivenWorkersItUsesThatWorkers(): void - { - $builder = ContextBuilder::create(); - $workers = $this->createMock(Workers::class); + public function testWhenGivenWorkersItUsesThatWorkers(): void + { + $builder = ContextBuilder::create(); + $workers = $this->createMock(Workers::class); - $context = $builder->withWorkers($workers)->build(); - $usedWorkers = $this->getProperty($context, 'workers'); + $context = $builder->withWorkers($workers)->build(); + $usedWorkers = $this->getProperty($context, 'workers'); - self::assertSame($workers, $usedWorkers); - } + self::assertSame($workers, $usedWorkers); + } } diff --git a/src/Tests/ContextTest.php b/src/Tests/ContextTest.php index a304942..380b20e 100644 --- a/src/Tests/ContextTest.php +++ b/src/Tests/ContextTest.php @@ -6,108 +6,108 @@ use PHPUnit\Framework\TestCase; use React\EventLoop\Factory; use React\EventLoop\LoopInterface; use React\EventLoop\Timer\Timer; -use Toalett\Multiprocessing\ConcurrencyLimit; +use Toalett\Multiprocessing\Concurrency; use Toalett\Multiprocessing\Context; use Toalett\Multiprocessing\Workers; class ContextTest extends TestCase { - public function testItEmitsAnEventWhenBooted(): void - { - $limit = $this->createMock(ConcurrencyLimit::class); - $loop = Factory::create(); - $context = new Context($loop, $limit); + public function testItEmitsAnEventWhenBooted(): void + { + $concurrency = $this->createMock(Concurrency::class); + $loop = Factory::create(); + $context = new Context($loop, $concurrency); - $loop->futureTick(fn() => $context->stop()); + $loop->futureTick(fn() => $context->stop()); - $bootEventHasTakenPlace = false; - $context->on('booted', function () use (&$bootEventHasTakenPlace) { - $bootEventHasTakenPlace = true; - }); + $bootEventHasTakenPlace = false; + $context->on('booted', function () use (&$bootEventHasTakenPlace) { + $bootEventHasTakenPlace = true; + }); - self::assertFalse($bootEventHasTakenPlace); - $context->run(); - self::assertTrue($bootEventHasTakenPlace); - } + self::assertFalse($bootEventHasTakenPlace); + $context->run(); + self::assertTrue($bootEventHasTakenPlace); + } - public function testItEmitsEventsWhenCongestionOccursAndIsRelieved(): void - { - $loop = Factory::create(); - $limit = $this->createMock(ConcurrencyLimit::class); - $context = new Context($loop, $limit); + public function testItEmitsEventsWhenCongestionOccursAndIsRelieved(): void + { + $loop = Factory::create(); + $concurrency = $this->createMock(Concurrency::class); + $context = new Context($loop, $concurrency); - $limit->method('isReachedBy')->willReturn(true); // trigger congestion + $concurrency->method('isReachedBy')->willReturn(true); // trigger congestion - $congestionEventHasTakenPlace = false; - $context->on('congestion', function () use (&$congestionEventHasTakenPlace) { - $congestionEventHasTakenPlace = true; - }); + $congestionEventHasTakenPlace = false; + $context->on('congestion', function () use (&$congestionEventHasTakenPlace) { + $congestionEventHasTakenPlace = true; + }); - $congestionRelievedEventHasTakenPlace = false; - $context->on('congestion_relieved', function () use (&$congestionRelievedEventHasTakenPlace) { - $congestionRelievedEventHasTakenPlace = true; - }); + $congestionRelievedEventHasTakenPlace = false; + $context->on('congestion_relieved', function () use (&$congestionRelievedEventHasTakenPlace) { + $congestionRelievedEventHasTakenPlace = true; + }); - self::assertFalse($congestionEventHasTakenPlace); - self::assertFalse($congestionRelievedEventHasTakenPlace); + self::assertFalse($congestionEventHasTakenPlace); + self::assertFalse($congestionRelievedEventHasTakenPlace); - $loop->futureTick(fn() => $context->stop()); - $context->submit(static fn() => null); - $context->run(); + $loop->futureTick(fn() => $context->stop()); + $context->submit(static fn() => null); + $context->run(); - self::assertTrue($congestionEventHasTakenPlace); - self::assertTrue($congestionRelievedEventHasTakenPlace); - } + self::assertTrue($congestionEventHasTakenPlace); + self::assertTrue($congestionRelievedEventHasTakenPlace); + } - public function testItCreatesAWorkerForASubmittedTask(): void - { - $limit = $this->createMock(ConcurrencyLimit::class); - $loop = $this->createMock(LoopInterface::class); - $context = new Context($loop, $limit); + public function testItCreatesAWorkerForASubmittedTask(): void + { + $concurrency = $this->createMock(Concurrency::class); + $loop = $this->createMock(LoopInterface::class); + $context = new Context($loop, $concurrency); - $limit->method('isReachedBy')->willReturn(false); - $loop->expects(self::once()) - ->method('futureTick') - ->withConsecutive([ - static fn() => null, - ]); + $concurrency->method('isReachedBy')->willReturn(false); + $loop->expects(self::once()) + ->method('futureTick') + ->withConsecutive([ + static fn() => null, + ]); - $context->submit(static fn() => null); - } + $context->submit(static fn() => null); + } - public function testItRegistersMaintenanceTasksOnTheEventLoop(): void - { - $loop = $this->createMock(LoopInterface::class); - $limit = $this->createMock(ConcurrencyLimit::class); + public function testItRegistersMaintenanceTasksOnTheEventLoop(): void + { + $loop = $this->createMock(LoopInterface::class); + $concurrency = $this->createMock(Concurrency::class); - $loop->expects(self::exactly(2)) - ->method('addPeriodicTimer') - ->withConsecutive( - [Context::INTERVAL_CLEANUP, static fn() => null], - [Context::INTERVAL_GC, static fn() => null] - )->willReturnOnConsecutiveCalls( - new Timer(Context::INTERVAL_CLEANUP, static fn() => null), - new Timer(Context::INTERVAL_GC, static fn() => null), - ); + $loop->expects(self::exactly(2)) + ->method('addPeriodicTimer') + ->withConsecutive( + [Context::INTERVAL_CLEANUP, static fn() => null], + [Context::INTERVAL_GC, static fn() => null] + )->willReturnOnConsecutiveCalls( + new Timer(Context::INTERVAL_CLEANUP, static fn() => null), + new Timer(Context::INTERVAL_GC, static fn() => null), + ); - $context = new Context($loop, $limit); - $context->run(); - } + $context = new Context($loop, $concurrency); + $context->run(); + } - public function testItForwardsWorkersEventsToSelf(): void - { - $loop = $this->createMock(LoopInterface::class); - $limit = $this->createMock(ConcurrencyLimit::class); - $workers = $this->createMock(Workers::class); + public function testItForwardsWorkersEventsToSelf(): void + { + $loop = $this->createMock(LoopInterface::class); + $concurrency = $this->createMock(Concurrency::class); + $workers = $this->createMock(Workers::class); - $workers->expects(self::exactly(3)) - ->method('on') - ->withConsecutive( - ['worker_started', static fn() => null], - ['worker_stopped', static fn() => null], - ['no_workers_remaining', static fn() => null] - ); + $workers->expects(self::exactly(3)) + ->method('on') + ->withConsecutive( + ['worker_started', static fn() => null], + ['worker_stopped', static fn() => null], + ['no_workers_remaining', static fn() => null] + ); - new Context($loop, $limit, $workers); - } + new Context($loop, $concurrency, $workers); + } } diff --git a/src/Tests/ProcessControl/ForkTest.php b/src/Tests/ProcessControl/ForkTest.php index 88297c7..5f22fe8 100644 --- a/src/Tests/ProcessControl/ForkTest.php +++ b/src/Tests/ProcessControl/ForkTest.php @@ -7,55 +7,55 @@ use Toalett\Multiprocessing\ProcessControl\Fork; class ForkTest extends TestCase { - /** - * @param int $pid - * @dataProvider positiveIntegerProvider - */ - public function testItSaysItIsAParentProcessWhenAPositivePidIsProvided(int $pid): void - { - $fork = new Fork($pid); - self::assertTrue($fork->isParent()); - self::assertFalse($fork->isChild()); - self::assertFalse($fork->failed()); - } + /** + * @param int $pid + * @dataProvider positiveIntegerProvider + */ + public function testItSaysItIsAParentProcessWhenAPositivePidIsProvided(int $pid): void + { + $fork = new Fork($pid); + self::assertTrue($fork->isParent()); + self::assertFalse($fork->isChild()); + self::assertFalse($fork->failed()); + } - /** - * @param int $pid - * @dataProvider negativeIntegerProvider - */ - public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void - { - $fork = new Fork($pid); - self::assertTrue($fork->isParent()); - self::assertFalse($fork->isChild()); - self::assertTrue($fork->failed()); - } + /** + * @param int $pid + * @dataProvider negativeIntegerProvider + */ + public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void + { + $fork = new Fork($pid); + self::assertTrue($fork->isParent()); + self::assertFalse($fork->isChild()); + self::assertTrue($fork->failed()); + } - public function testItSaysItIsAChildProcessWhenPidZeroIsProvided(): void - { - $fork = new Fork(0); - self::assertFalse($fork->isParent()); - self::assertTrue($fork->isChild()); - self::assertFalse($fork->failed()); - } + public function testItSaysItIsAChildProcessWhenPidZeroIsProvided(): void + { + $fork = new Fork(0); + self::assertFalse($fork->isParent()); + self::assertTrue($fork->isChild()); + self::assertFalse($fork->failed()); + } - public function positiveIntegerProvider(): array - { - return [ - [1], - [10], - [1000], - [PHP_INT_MAX], - ]; - } + public function positiveIntegerProvider(): array + { + return [ + [1], + [10], + [1000], + [PHP_INT_MAX], + ]; + } - public function negativeIntegerProvider(): array - { - return [ - [-1], - [-10], - [-1000], - [PHP_INT_MIN], - ]; - } + public function negativeIntegerProvider(): array + { + return [ + [-1], + [-10], + [-1000], + [PHP_INT_MIN], + ]; + } } diff --git a/src/Tests/ProcessControl/WaitTest.php b/src/Tests/ProcessControl/WaitTest.php index 62585f5..5f2838e 100644 --- a/src/Tests/ProcessControl/WaitTest.php +++ b/src/Tests/ProcessControl/WaitTest.php @@ -7,45 +7,45 @@ use Toalett\Multiprocessing\ProcessControl\Wait; class WaitTest extends TestCase { - /** - * @param int $pid - * @dataProvider positiveIntegerProvider - */ - public function testItSaysAChildStoppedWhenAPositivePidIsProvided(int $pid): void - { - $wait = new Wait($pid, 0); - self::assertTrue($wait->childStopped()); - self::assertFalse($wait->failed()); - } + /** + * @param int $pid + * @dataProvider positiveIntegerProvider + */ + public function testItSaysAChildStoppedWhenAPositivePidIsProvided(int $pid): void + { + $wait = new Wait($pid, 0); + self::assertTrue($wait->childStopped()); + self::assertFalse($wait->failed()); + } - /** - * @param int $pid - * @dataProvider negativeIntegerProvider - */ - public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void - { - $wait = new Wait($pid, 0); - self::assertFalse($wait->childStopped()); - self::assertTrue($wait->failed()); - } + /** + * @param int $pid + * @dataProvider negativeIntegerProvider + */ + public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void + { + $wait = new Wait($pid, 0); + self::assertFalse($wait->childStopped()); + self::assertTrue($wait->failed()); + } - public function positiveIntegerProvider(): array - { - return [ - [1], - [10], - [1000], - [PHP_INT_MAX], - ]; - } + public function positiveIntegerProvider(): array + { + return [ + [1], + [10], + [1000], + [PHP_INT_MAX], + ]; + } - public function negativeIntegerProvider(): array - { - return [ - [-1], - [-10], - [-1000], - [PHP_INT_MIN], - ]; - } + public function negativeIntegerProvider(): array + { + return [ + [-1], + [-10], + [-1000], + [PHP_INT_MIN], + ]; + } } diff --git a/src/Tests/Task/IntervalTest.php b/src/Tests/Task/IntervalTest.php index a31ed61..feefa3d 100644 --- a/src/Tests/Task/IntervalTest.php +++ b/src/Tests/Task/IntervalTest.php @@ -9,50 +9,50 @@ use Toalett\Multiprocessing\Task\Interval; class IntervalTest extends TestCase { - /** - * @param $method - * @param $val - * @param $calculatedVal - * @dataProvider zeroAndDownProvider - */ - public function testItDoesNotAllowLessThanZeroOrZero($method, $val, $calculatedVal): void - { - $this->setName(sprintf('It does not allow %d for %s', $val, $method)); - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage(sprintf('Expected positive float, got \'%s\'', $calculatedVal)); - Interval::{$method}($val); - } + /** + * @param $method + * @param $val + * @param $calculatedVal + * @dataProvider zeroAndDownProvider + */ + public function testItDoesNotAllowLessThanZeroOrZero($method, $val, $calculatedVal): void + { + $this->setName(sprintf('It does not allow %d for %s', $val, $method)); + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage(sprintf('Expected positive float, got \'%s\'', $calculatedVal)); + Interval::{$method}($val); + } - /** - * @param $method - * @param $val - * @param $expected - * @dataProvider oneAndUpProvider - */ - public function testItCalculatesTheCorrectInterval($method, $val, $expected): void - { - $this->setName('It calculates the correct interval in ' . $method); - $interval = Interval::{$method}($val); + /** + * @param $method + * @param $val + * @param $expected + * @dataProvider oneAndUpProvider + */ + public function testItCalculatesTheCorrectInterval($method, $val, $expected): void + { + $this->setName('It calculates the correct interval in ' . $method); + $interval = Interval::{$method}($val); - self::assertEquals($expected, $interval->asFloat()); - } + self::assertEquals($expected, $interval->asFloat()); + } - public function zeroAndDownProvider(): Generator - { - return $this->createProvider(0, -5, -9000); - } + public function zeroAndDownProvider(): Generator + { + return $this->createProvider(0, -5, -9000); + } - public function oneAndUpProvider(): Generator - { - return $this->createProvider(1, 5, 7500); - } + public function oneAndUpProvider(): Generator + { + return $this->createProvider(1, 5, 7500); + } - public function createProvider(...$args): Generator - { - foreach ($args as $arg) { - yield "$arg seconds" => ['seconds', $arg, $arg]; - yield "$arg minutes" => ['minutes', $arg, $arg * 60.0]; - yield "$arg hours" => ['hours', $arg, $arg * 3600.0]; - } - } + public function createProvider(...$args): Generator + { + foreach ($args as $arg) { + yield "$arg seconds" => ['seconds', $arg, $arg]; + yield "$arg minutes" => ['minutes', $arg, $arg * 60.0]; + yield "$arg hours" => ['hours', $arg, $arg * 3600.0]; + } + } } diff --git a/src/Tests/Task/RepeatedTaskTest.php b/src/Tests/Task/RepeatedTaskTest.php index 0e5e8c7..21c32a9 100644 --- a/src/Tests/Task/RepeatedTaskTest.php +++ b/src/Tests/Task/RepeatedTaskTest.php @@ -11,27 +11,27 @@ use Toalett\Multiprocessing\Task\RepeatedTask; class RepeatedTaskTest extends TestCase { - /** - * @param $interval - * @dataProvider dataProvider - */ - public function testItRegistersWithTheProvidedInterval(Interval $interval): void - { - $loop = $this->createMock(LoopInterface::class); - $loop->expects(self::once()) - ->method('addPeriodicTimer') - ->with($interval->asFloat(), static fn() => null) - ->willReturn(new Timer($interval->asFloat(), static fn() => null, true)); + /** + * @param $interval + * @dataProvider dataProvider + */ + public function testItRegistersWithTheProvidedInterval(Interval $interval): void + { + $loop = $this->createMock(LoopInterface::class); + $loop->expects(self::once()) + ->method('addPeriodicTimer') + ->with($interval->asFloat(), static fn() => null) + ->willReturn(new Timer($interval->asFloat(), static fn() => null, true)); - $task = new RepeatedTask($interval, static fn() => null); - $task->enable($loop); - } + $task = new RepeatedTask($interval, static fn() => null); + $task->enable($loop); + } - public function dataProvider(): Generator - { - yield "3 seconds" => [Interval::seconds(3)]; - yield "5 minutes" => [Interval::minutes(5)]; - yield "half an hour" => [Interval::hours(0.5)]; - yield "a day" => [Interval::hours(24)]; - } + public function dataProvider(): Generator + { + yield "3 seconds" => [Interval::seconds(3)]; + yield "5 minutes" => [Interval::minutes(5)]; + yield "half an hour" => [Interval::hours(0.5)]; + yield "a day" => [Interval::hours(24)]; + } } diff --git a/src/Tests/Task/TasksTest.php b/src/Tests/Task/TasksTest.php index ac49d15..fbb4152 100644 --- a/src/Tests/Task/TasksTest.php +++ b/src/Tests/Task/TasksTest.php @@ -3,89 +3,89 @@ namespace Toalett\Multiprocessing\Tests\Task; use PHPUnit\Framework\MockObject\MockObject; +use PHPUnit\Framework\TestCase; use React\EventLoop\LoopInterface; use Toalett\Multiprocessing\Task\Task; use Toalett\Multiprocessing\Task\Tasks; -use PHPUnit\Framework\TestCase; use Toalett\Multiprocessing\Tests\Tools\PropertyInspector; class TasksTest extends TestCase { - use PropertyInspector; + use PropertyInspector; - public function testItAcceptsZeroTasks(): void - { - $this->expectNotToPerformAssertions(); - new Tasks(); - } + public function testItAcceptsZeroTasks(): void + { + $this->expectNotToPerformAssertions(); + new Tasks(); + } - public function testItAcceptsMultipleTasks(): void - { - $this->expectNotToPerformAssertions(); - new Tasks( - $this->createMock(Task::class), - $this->createMock(Task::class) - ); - } + public function testItAcceptsMultipleTasks(): void + { + $this->expectNotToPerformAssertions(); + new Tasks( + $this->createMock(Task::class), + $this->createMock(Task::class) + ); + } - public function testItDoesNotReEnableWhenEnabled(): void - { - $loop = $this->createMock(LoopInterface::class); - $task = $this->createMock(Task::class); - $tasks = new Tasks($task); + public function testItDoesNotReEnableWhenEnabled(): void + { + $loop = $this->createMock(LoopInterface::class); + $task = $this->createMock(Task::class); + $tasks = new Tasks($task); - $task->expects(self::once()) - ->method('enable') - ->with($loop); + $task->expects(self::once()) + ->method('enable') + ->with($loop); - $tasks->enable($loop); - $tasks->enable($loop); - } + $tasks->enable($loop); + $tasks->enable($loop); + } - public function testItEnablesAllTasksWhenEnableCalled(): void - { - $loop = $this->createMock(LoopInterface::class); - $task1 = $this->createMock(Task::class); - $task2 = $this->createMock(Task::class); - $task3 = $this->createMock(Task::class); + public function testItEnablesAllTasksWhenEnableCalled(): void + { + $loop = $this->createMock(LoopInterface::class); + $task1 = $this->createMock(Task::class); + $task2 = $this->createMock(Task::class); + $task3 = $this->createMock(Task::class); - foreach([$task1, $task2, $task3] as $task) { - /** @var MockObject|Task $task */ - $task->expects(self::once())->method('enable')->with($loop); - } + foreach ([$task1, $task2, $task3] as $task) { + /** @var MockObject|Task $task */ + $task->expects(self::once())->method('enable')->with($loop); + } - (new Tasks($task1, $task2, $task3))->enable($loop); - } + (new Tasks($task1, $task2, $task3))->enable($loop); + } - public function testItCancelsAllTasksWhenCancelCalled(): void - { - $loop = $this->createMock(LoopInterface::class); - $task1 = $this->createMock(Task::class); - $task2 = $this->createMock(Task::class); - $task3 = $this->createMock(Task::class); + public function testItCancelsAllTasksWhenCancelCalled(): void + { + $loop = $this->createMock(LoopInterface::class); + $task1 = $this->createMock(Task::class); + $task2 = $this->createMock(Task::class); + $task3 = $this->createMock(Task::class); - foreach([$task1, $task2, $task3] as $task) { - /** @var MockObject|Task $task */ - $task->expects(self::once())->method('cancel')->with($loop); - } + foreach ([$task1, $task2, $task3] as $task) { + /** @var MockObject|Task $task */ + $task->expects(self::once())->method('cancel')->with($loop); + } - $tasks = new Tasks($task1, $task2, $task3); - $this->setProperty($tasks, 'loop', $loop); - $tasks->cancel(); - } + $tasks = new Tasks($task1, $task2, $task3); + $this->setProperty($tasks, 'loop', $loop); + $tasks->cancel(); + } - public function testItDoesNotCancelTasksWhenTheyAreNotEnabled(): void - { - $task1 = $this->createMock(Task::class); - $task2 = $this->createMock(Task::class); - $task3 = $this->createMock(Task::class); + public function testItDoesNotCancelTasksWhenTheyAreNotEnabled(): void + { + $task1 = $this->createMock(Task::class); + $task2 = $this->createMock(Task::class); + $task3 = $this->createMock(Task::class); - foreach([$task1, $task2, $task3] as $task) { - /** @var MockObject|Task $task */ - $task->expects(self::never())->method('cancel'); - } + foreach ([$task1, $task2, $task3] as $task) { + /** @var MockObject|Task $task */ + $task->expects(self::never())->method('cancel'); + } - $tasks = new Tasks($task1, $task2, $task3); - $tasks->cancel(); - } + $tasks = new Tasks($task1, $task2, $task3); + $tasks->cancel(); + } } diff --git a/src/Tests/Tools/PropertyInspector.php b/src/Tests/Tools/PropertyInspector.php index 3b136f6..504cb3f 100644 --- a/src/Tests/Tools/PropertyInspector.php +++ b/src/Tests/Tools/PropertyInspector.php @@ -6,19 +6,19 @@ use ReflectionObject; trait PropertyInspector { - protected function getProperty(object $object, string $propertyName) - { - $reflector = new ReflectionObject($object); - $property = $reflector->getProperty($propertyName); - $property->setAccessible(true); - return $property->getValue($object); - } + protected function getProperty(object $object, string $propertyName) + { + $reflector = new ReflectionObject($object); + $property = $reflector->getProperty($propertyName); + $property->setAccessible(true); + return $property->getValue($object); + } - protected function setProperty(object $object, string $propertyName, $value): void - { - $reflector = new ReflectionObject($object); - $property = $reflector->getProperty($propertyName); - $property->setAccessible(true); - $property->setValue($object, $value); - } + protected function setProperty(object $object, string $propertyName, $value): void + { + $reflector = new ReflectionObject($object); + $property = $reflector->getProperty($propertyName); + $property->setAccessible(true); + $property->setValue($object, $value); + } } diff --git a/src/Tests/WorkersTest.php b/src/Tests/WorkersTest.php index aeb10a4..cb7a669 100644 --- a/src/Tests/WorkersTest.php +++ b/src/Tests/WorkersTest.php @@ -11,113 +11,113 @@ use Toalett\Multiprocessing\Workers; class WorkersTest extends TestCase { - public function testItSaysItIsEmptyWhenNoWorkers(): void - { - $processControl = $this->createMock(ProcessControl::class); - $workers = new Workers($processControl); - self::assertEmpty($workers); - } + public function testItSaysItIsEmptyWhenNoWorkers(): void + { + $processControl = $this->createMock(ProcessControl::class); + $workers = new Workers($processControl); + self::assertEmpty($workers); + } - public function testItSaysItHasOneWorkerWhenTaskExecutes(): void - { - $workers = new Workers(); + public function testItSaysItHasOneWorkerWhenTaskExecutes(): void + { + $workers = new Workers(); - $workers->createWorkerFor(fn() => exit(0), []); - self::assertCount(1, $workers); - } + $workers->createWorkerFor(fn() => exit(0), []); + self::assertCount(1, $workers); + } - public function testItGivesTheAmountOfActiveWorkersOnCount(): void - { - $workers = new Workers(); + public function testItGivesTheAmountOfActiveWorkersOnCount(): void + { + $workers = new Workers(); - $workers->createWorkerFor(fn() => exit(0), []); - $workers->createWorkerFor(fn() => exit(0), []); - self::assertCount(2, $workers); + $workers->createWorkerFor(fn() => exit(0), []); + $workers->createWorkerFor(fn() => exit(0), []); + self::assertCount(2, $workers); - $workers->createWorkerFor(fn() => exit(0), []); - self::assertCount(3, $workers); + $workers->createWorkerFor(fn() => exit(0), []); + self::assertCount(3, $workers); - $workers->stop(); - self::assertEmpty($workers); - } + $workers->stop(); + self::assertEmpty($workers); + } - public function testItEmitsAnEventWhenAWorkerIsStarted(): void - { - $workers = new Workers(); + public function testItEmitsAnEventWhenAWorkerIsStarted(): void + { + $workers = new Workers(); - $workerStartedEventHasTakenPlace = false; - $workers->on('worker_started', function () use (&$workerStartedEventHasTakenPlace) { - $workerStartedEventHasTakenPlace = true; - }); + $workerStartedEventHasTakenPlace = false; + $workers->on('worker_started', function () use (&$workerStartedEventHasTakenPlace) { + $workerStartedEventHasTakenPlace = true; + }); - self::assertFalse($workerStartedEventHasTakenPlace); - $workers->createWorkerFor(fn() => exit(0), []); - self::assertTrue($workerStartedEventHasTakenPlace); - } + self::assertFalse($workerStartedEventHasTakenPlace); + $workers->createWorkerFor(fn() => exit(0), []); + self::assertTrue($workerStartedEventHasTakenPlace); + } - public function testItEmitsAnEventWhenAWorkerIsRemoved(): void - { - $workers = new Workers(); - $reflector = new ReflectionObject($workers); - $method = $reflector->getMethod('remove'); - $method->setAccessible(true); + public function testItEmitsAnEventWhenAWorkerIsRemoved(): void + { + $workers = new Workers(); + $reflector = new ReflectionObject($workers); + $method = $reflector->getMethod('remove'); + $method->setAccessible(true); - $workerStoppedEventHasTakenPlace = false; - $workers->on('worker_stopped', function () use (&$workerStoppedEventHasTakenPlace) { - $workerStoppedEventHasTakenPlace = true; - }); + $workerStoppedEventHasTakenPlace = false; + $workers->on('worker_stopped', function () use (&$workerStoppedEventHasTakenPlace) { + $workerStoppedEventHasTakenPlace = true; + }); - self::assertFalse($workerStoppedEventHasTakenPlace); - $method->invoke($workers, 0); - self::assertTrue($workerStoppedEventHasTakenPlace); - } + self::assertFalse($workerStoppedEventHasTakenPlace); + $method->invoke($workers, 0); + self::assertTrue($workerStoppedEventHasTakenPlace); + } - public function testItEmitsAnEventWhenNoWorkersRemain(): void - { - $workers = new Workers(); + public function testItEmitsAnEventWhenNoWorkersRemain(): void + { + $workers = new Workers(); - $noWorkersRemainingEventHasTakenPlace = false; - $workers->on('no_workers_remaining', function () use (&$noWorkersRemainingEventHasTakenPlace) { - $noWorkersRemainingEventHasTakenPlace = true; - }); + $noWorkersRemainingEventHasTakenPlace = false; + $workers->on('no_workers_remaining', function () use (&$noWorkersRemainingEventHasTakenPlace) { + $noWorkersRemainingEventHasTakenPlace = true; + }); - self::assertFalse($noWorkersRemainingEventHasTakenPlace); - $workers->cleanup(); - self::assertTrue($noWorkersRemainingEventHasTakenPlace); - } + self::assertFalse($noWorkersRemainingEventHasTakenPlace); + $workers->cleanup(); + self::assertTrue($noWorkersRemainingEventHasTakenPlace); + } - public function testItCallsForkOnProcessControlWhenAskedToCreateAWorker(): void - { - $processControl = $this->createMock(ProcessControl::class); - $processControl->expects(self::once()) - ->method('fork') - ->willReturn(new Fork(1)); + public function testItCallsForkOnProcessControlWhenAskedToCreateAWorker(): void + { + $processControl = $this->createMock(ProcessControl::class); + $processControl->expects(self::once()) + ->method('fork') + ->willReturn(new Fork(1)); - $workers = new Workers($processControl); - $workers->createWorkerFor(fn() => []); - } + $workers = new Workers($processControl); + $workers->createWorkerFor(fn() => []); + } - public function testItCallsNonBlockingWaitOnProcessControlWhenPerformingCleanup(): void - { - $processControl = $this->createMock(ProcessControl::class); - $processControl->expects(self::once()) - ->method('wait') - ->with(Wait::NO_HANG) - ->willReturn(new Wait(0)); + public function testItCallsNonBlockingWaitOnProcessControlWhenPerformingCleanup(): void + { + $processControl = $this->createMock(ProcessControl::class); + $processControl->expects(self::once()) + ->method('wait') + ->with(Wait::NO_HANG) + ->willReturn(new Wait(0)); - $workers = new Workers($processControl); - $workers->cleanup(); - } + $workers = new Workers($processControl); + $workers->cleanup(); + } - public function testItCallsBlockingWaitOnProcessControlWhenAwaitingCongestionRelief(): void - { - $processControl = $this->createMock(ProcessControl::class); - $processControl->expects(self::once()) - ->method('wait') - ->with(/* no arguments */) - ->willReturn(new Wait(1)); + public function testItCallsBlockingWaitOnProcessControlWhenAwaitingCongestionRelief(): void + { + $processControl = $this->createMock(ProcessControl::class); + $processControl->expects(self::once()) + ->method('wait') + ->with(/* no arguments */) + ->willReturn(new Wait(1)); - $workers = new Workers($processControl); - $workers->awaitCongestionRelief(); - } + $workers = new Workers($processControl); + $workers->awaitCongestionRelief(); + } } diff --git a/src/Workers.php b/src/Workers.php index c3501e3..5c05d8c 100644 --- a/src/Workers.php +++ b/src/Workers.php @@ -13,86 +13,86 @@ use Toalett\Multiprocessing\ProcessControl\Wait; class Workers implements Countable, EventEmitterInterface { - use EventEmitterTrait; + use EventEmitterTrait; - /** @var int[] */ - private array $workers = []; - private ProcessControl $processControl; + /** @var int[] */ + private array $workers = []; + private ProcessControl $processControl; - public function __construct(?ProcessControl $processControl = null) - { - $this->processControl = $processControl ?? new PCNTL(); - } + public function __construct(?ProcessControl $processControl = null) + { + $this->processControl = $processControl ?? new PCNTL(); + } - public function count(): int - { - return count($this->workers); - } + public function count(): int + { + return count($this->workers); + } - public function createWorkerFor(callable $task, array $args = []): void - { - $pid = $this->forkWorker($task, $args); - $this->workers[$pid] = $pid; - $this->emit('worker_started', [$pid]); - } + public function createWorkerFor(callable $task, array $args = []): void + { + $pid = $this->forkWorker($task, $args); + $this->workers[$pid] = $pid; + $this->emit('worker_started', [$pid]); + } - public function cleanup(): void - { - while (true === $this->wait(Wait::NO_HANG)) ; - if (0 === count($this)) { - $this->emit('no_workers_remaining'); - } - } + public function cleanup(): void + { + while (true === $this->wait(Wait::NO_HANG)) ; + if (0 === count($this)) { + $this->emit('no_workers_remaining'); + } + } - public function awaitCongestionRelief(): void - { - $this->wait(); - } + public function awaitCongestionRelief(): void + { + $this->wait(); + } - private function remove(int $pid): void - { - unset($this->workers[$pid]); - $this->emit('worker_stopped', [$pid]); - } + private function remove(int $pid): void + { + unset($this->workers[$pid]); + $this->emit('worker_stopped', [$pid]); + } - private function forkWorker(callable $task, array $args): int - { - $fork = $this->processControl->fork(); - if ($fork->failed()) { - throw ProcessControlException::forkFailed(); - } + private function forkWorker(callable $task, array $args): int + { + $fork = $this->processControl->fork(); + if ($fork->failed()) { + throw ProcessControlException::forkFailed(); + } - if ($fork->isChild()) { - try { - call_user_func_array($task, $args); - } catch (Throwable $t) { - fwrite(STDERR, $t->getMessage()); - exit(1); - } - exit(0); - } + if ($fork->isChild()) { + try { + call_user_func_array($task, $args); + } catch (Throwable $t) { + fwrite(STDERR, $t->getMessage()); + exit(1); + } + exit(0); + } - return $fork->pid; - } + return $fork->pid; + } - /** - * @param int $options - * @return bool Whether a process was caught - */ - private function wait(int $options = 0): bool - { - $wait = $this->processControl->wait($options); - if ($wait->childStopped()) { - $this->remove($wait->pid); - return true; - } - // We ignore errors ($pid < 0). This method is called periodically, even if there is - // no child available. pcntl_wait() will return -1. This is expected behavior. - return false; - } + /** + * @param int $options + * @return bool Whether a process was caught + */ + private function wait(int $options = 0): bool + { + $wait = $this->processControl->wait($options); + if ($wait->childStopped()) { + $this->remove($wait->pid); + return true; + } + // We ignore errors ($pid < 0). This method is called periodically, even if there is + // no child available. pcntl_wait() will return -1. This is expected behavior. + return false; + } - public function stop(): void - { - while (true === $this->wait()) ; - } + public function stop(): void + { + while (true === $this->wait()) ; + } }