eventLoop = $eventLoop; $this->concurrency = $concurrency; $this->workers = $workers ?? new Workers(); $this->setupWorkerEventForwarding(); $this->setupMaintenanceTasks($cleanupInterval); } public function run(): void { $this->eventLoop->futureTick(fn() => $this->emit('booted')); $this->eventLoop->futureTick(fn() => gc_enable()); $this->maintenanceTasks->enable($this->eventLoop); $this->eventLoop->run(); } public function submit(callable $job, ...$args): void { $this->eventLoop->futureTick(function () use ($job, $args) { if ($this->concurrency->isReachedBy(count($this->workers))) { $this->emit('congestion'); $this->workers->awaitCongestionRelief(); $this->emit('congestion_relieved'); } $this->workers->createWorkerFor($job, $args); }); } public function stop(): void { $this->maintenanceTasks->cancel(); $this->workers->stop(); $this->emit('stopped'); } private function setupWorkerEventForwarding(): void { $this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid])); $this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid])); $this->workers->on('no_workers_remaining', fn() => $this->emit('no_workers_remaining')); } private function setupMaintenanceTasks(?Interval $cleanupInterval): void { $cleanupInterval = $cleanupInterval ?? Interval::seconds(self::INTERVAL_CLEANUP); $gcInterval = Interval::seconds(self::INTERVAL_GC); $this->maintenanceTasks = new Tasks( new RepeatedTask($cleanupInterval, [$this->workers, 'cleanup']), new RepeatedTask($gcInterval, 'gc_collect_cycles') ); } }