diff --git a/composer.json b/composer.json index 4cb6b38..cce41ef 100644 --- a/composer.json +++ b/composer.json @@ -15,9 +15,7 @@ "psr-4": { "Toalett\\Multiprocessing\\": ["src"] }, - "exclude-from-classmap": [ - "**/Tests/" - ] + "exclude-from-classmap": ["src/Tests/"] }, "autoload-dev": { "psr-4": { @@ -33,6 +31,6 @@ "extra": { }, "require-dev": { - "phpunit/phpunit": "^9.5.0" + "phpunit/phpunit": "^9.5" } } diff --git a/composer.lock b/composer.lock index 9bd941f..9386521 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "e6e2d2021fce4df2143d382fe26302e7", + "content-hash": "431bed10aaf05c9691db445c588f6262", "packages": [ { "name": "evenement/evenement", diff --git a/src/Context.php b/src/Context.php index d0e29c1..b947e96 100644 --- a/src/Context.php +++ b/src/Context.php @@ -29,7 +29,7 @@ class Context implements EventEmitterInterface $this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid])); $this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid])); - $this->workers->on('worker_stopped', fn() => $this->emitIf($this->workers->empty(), 'no_workers_remaining')); + $this->workers->on('worker_stopped', fn() => $this->emitIf(empty($this->workers), 'no_workers_remaining')); } public function submit(callable $task, ...$args): void diff --git a/src/ProcessControl/Fork.php b/src/ProcessControl/Fork.php new file mode 100644 index 0000000..b953766 --- /dev/null +++ b/src/ProcessControl/Fork.php @@ -0,0 +1,28 @@ +pid = $pid; + } + + public function failed(): bool + { + return $this->pid < 0; + } + + public function isChild(): bool + { + return $this->pid === 0; + } + + public function isParent(): bool + { + return $this->pid !== 0; + } +} diff --git a/src/ProcessControl/PCNTL.php b/src/ProcessControl/PCNTL.php new file mode 100644 index 0000000..8728c52 --- /dev/null +++ b/src/ProcessControl/PCNTL.php @@ -0,0 +1,18 @@ +pid = $pid; + $this->status = $status; + } + + public function childStopped(): bool + { + return $this->pid > 0; + } + + public function failed(): bool + { + return $this->pid < 0; + } +} diff --git a/src/Tests/ProcessControl/ForkTest.php b/src/Tests/ProcessControl/ForkTest.php new file mode 100644 index 0000000..88297c7 --- /dev/null +++ b/src/Tests/ProcessControl/ForkTest.php @@ -0,0 +1,61 @@ +isParent()); + self::assertFalse($fork->isChild()); + self::assertFalse($fork->failed()); + } + + /** + * @param int $pid + * @dataProvider negativeIntegerProvider + */ + public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void + { + $fork = new Fork($pid); + self::assertTrue($fork->isParent()); + self::assertFalse($fork->isChild()); + self::assertTrue($fork->failed()); + } + + public function testItSaysItIsAChildProcessWhenPidZeroIsProvided(): void + { + $fork = new Fork(0); + self::assertFalse($fork->isParent()); + self::assertTrue($fork->isChild()); + self::assertFalse($fork->failed()); + } + + public function positiveIntegerProvider(): array + { + return [ + [1], + [10], + [1000], + [PHP_INT_MAX], + ]; + } + + public function negativeIntegerProvider(): array + { + return [ + [-1], + [-10], + [-1000], + [PHP_INT_MIN], + ]; + } +} diff --git a/src/Tests/ProcessControl/WaitTest.php b/src/Tests/ProcessControl/WaitTest.php new file mode 100644 index 0000000..62585f5 --- /dev/null +++ b/src/Tests/ProcessControl/WaitTest.php @@ -0,0 +1,51 @@ +childStopped()); + self::assertFalse($wait->failed()); + } + + /** + * @param int $pid + * @dataProvider negativeIntegerProvider + */ + public function testItSaysItFailedWhenANegativePidIsProvided(int $pid): void + { + $wait = new Wait($pid, 0); + self::assertFalse($wait->childStopped()); + self::assertTrue($wait->failed()); + } + + public function positiveIntegerProvider(): array + { + return [ + [1], + [10], + [1000], + [PHP_INT_MAX], + ]; + } + + public function negativeIntegerProvider(): array + { + return [ + [-1], + [-10], + [-1000], + [PHP_INT_MIN], + ]; + } +} diff --git a/src/Tests/WorkersTest.php b/src/Tests/WorkersTest.php index 61d7c7f..91da5a9 100644 --- a/src/Tests/WorkersTest.php +++ b/src/Tests/WorkersTest.php @@ -2,22 +2,55 @@ namespace Toalett\Multiprocessing\Tests; -use ReflectionObject; -use Toalett\Multiprocessing\Workers; use PHPUnit\Framework\TestCase; +use ReflectionObject; +use Toalett\Multiprocessing\ProcessControl\Fork; +use Toalett\Multiprocessing\ProcessControl\ProcessControl; +use Toalett\Multiprocessing\ProcessControl\Wait; +use Toalett\Multiprocessing\Workers; class WorkersTest extends TestCase { + public function testItSaysItIsEmptyWhenNoWorkers(): void + { + $processControl = $this->createMock(ProcessControl::class); + $workers = new Workers($processControl); + self::assertEmpty($workers); + } + + public function testItSaysItHasOneWorkerWhenTaskExecutes(): void + { + $workers = new Workers(); + + $workers->createWorkerFor(fn() => exit(0), []); + self::assertCount(1, $workers); + } + + public function testItGivesTheAmountOfActiveWorkersOnCount(): void + { + $workers = new Workers(); + + $workers->createWorkerFor(fn() => exit(0), []); + $workers->createWorkerFor(fn() => exit(0), []); + self::assertCount(2, $workers); + + $workers->createWorkerFor(fn() => exit(0), []); + self::assertCount(3, $workers); + + $workers->stop(); + self::assertEmpty($workers); + } + public function testItEmitsAnEventWhenAWorkerIsStarted(): void { $workers = new Workers(); $workerStartedEventHasTakenPlace = false; - $workers->on('worker_started', function() use (&$workerStartedEventHasTakenPlace) { + $workers->on('worker_started', function () use (&$workerStartedEventHasTakenPlace) { $workerStartedEventHasTakenPlace = true; }); - self::assertFalse($workerStartedEventHasTakenPlace); + $workers->createWorkerFor(fn() => exit(0), []); self::assertTrue($workerStartedEventHasTakenPlace); } @@ -30,7 +63,7 @@ class WorkersTest extends TestCase $method->setAccessible(true); $workerStoppedEventHasTakenPlace = false; - $workers->on('worker_stopped', function() use (&$workerStoppedEventHasTakenPlace) { + $workers->on('worker_stopped', function () use (&$workerStoppedEventHasTakenPlace) { $workerStoppedEventHasTakenPlace = true; }); @@ -38,4 +71,39 @@ class WorkersTest extends TestCase $method->invoke($workers, 0); self::assertTrue($workerStoppedEventHasTakenPlace); } + + public function testItCallsForkOnProcessControlWhenAskedToCreateAWorker(): void + { + $processControl = $this->createMock(ProcessControl::class); + $processControl->expects(self::once()) + ->method('fork') + ->willReturn(new Fork(1)); + + $workers = new Workers($processControl); + $workers->createWorkerFor(fn() => []); + } + + public function testItCallsNonBlockingWaitOnProcessControlWhenPerformingCleanup(): void + { + $processControl = $this->createMock(ProcessControl::class); + $processControl->expects(self::once()) + ->method('wait') + ->with(WNOHANG) + ->willReturn(new Wait(0)); + + $workers = new Workers($processControl); + $workers->cleanup(); + } + + public function testItCallsBlockingWaitOnProcessControlWhenAwaitingCongestionRelief(): void + { + $processControl = $this->createMock(ProcessControl::class); + $processControl->expects(self::once()) + ->method('wait') + ->with(/* no arguments */) + ->willReturn(new Wait(1)); + + $workers = new Workers($processControl); + $workers->awaitCongestionRelief(); + } } diff --git a/src/Workers.php b/src/Workers.php index a7af96f..f88d9ff 100644 --- a/src/Workers.php +++ b/src/Workers.php @@ -7,6 +7,8 @@ use Evenement\EventEmitterInterface; use Evenement\EventEmitterTrait; use Throwable; use Toalett\Multiprocessing\Exception\ProcessControlException; +use Toalett\Multiprocessing\ProcessControl\PCNTL; +use Toalett\Multiprocessing\ProcessControl\ProcessControl; class Workers implements Countable, EventEmitterInterface { @@ -14,18 +16,19 @@ class Workers implements Countable, EventEmitterInterface /** @var int[] */ private array $workers = []; + private ProcessControl $processControl; + + public function __construct(?ProcessControl $processControl = null) + { + $this->processControl = $processControl ?? new PCNTL(); + } public function count(): int { return count($this->workers); } - public function empty(): bool - { - return count($this->workers) === 0; - } - - public function createWorkerFor(callable $task, array $args): void + public function createWorkerFor(callable $task, array $args = []): void { $pid = $this->forkWorker($task, $args); $this->workers[$pid] = $pid; @@ -50,12 +53,12 @@ class Workers implements Countable, EventEmitterInterface private function forkWorker(callable $task, array $args): int { - $pid = pcntl_fork(); - if ($pid === -1) { + $fork = $this->processControl->fork(); + if ($fork->failed()) { throw ProcessControlException::forkFailed(); } - if ($pid === 0) { + if ($fork->isChild()) { try { call_user_func_array($task, $args); } catch (Throwable $t) { @@ -65,7 +68,7 @@ class Workers implements Countable, EventEmitterInterface exit(0); } - return $pid; + return $fork->pid; } /** @@ -74,9 +77,9 @@ class Workers implements Countable, EventEmitterInterface */ private function wait(int $options = 0): bool { - $pid = pcntl_wait($status, $options); - if ($pid > 0) { - $this->remove($pid); + $wait = $this->processControl->wait($options); + if ($wait->childStopped()) { + $this->remove($wait->pid); return true; } // We ignore errors ($pid < 0). This method is called periodically, even if there is @@ -86,6 +89,6 @@ class Workers implements Countable, EventEmitterInterface public function stop(): void { - while (true === $this->wait()); + while (true === $this->wait()) ; } }