processControl = $processControl ?? new PCNTL(); } public function count(): int { return count($this->workers); } public function createWorkerFor(callable $job, array $args = []): void { $pid = $this->forkWorker($job, $args); $this->workers[$pid] = $pid; $this->emit('worker_started', [$pid]); } public function cleanup(): void { while (true === $this->wait(Wait::NO_HANG)) ; if (0 === count($this)) { $this->emit('no_workers_remaining'); } } public function awaitCongestionRelief(): void { $this->wait(); } private function remove(int $pid): void { unset($this->workers[$pid]); $this->emit('worker_stopped', [$pid]); } private function forkWorker(callable $task, array $args): int { $fork = $this->processControl->fork(); if ($fork->failed()) { throw new RuntimeException('Could not fork process'); } if ($fork->isChild()) { try { call_user_func_array($task, $args); } catch (Throwable $t) { fwrite(STDERR, $t->getMessage()); exit(1); } exit(0); } return $fork->pid; } /** * @param int $options * @return bool Whether a process was caught */ private function wait(int $options = 0): bool { $wait = $this->processControl->wait($options); if ($wait->childStopped()) { $this->remove($wait->pid); return true; } return false; } public function stop(): void { while (true === $this->wait()) ; } }