2020-12-11 01:25:38 +01:00
|
|
|
<?php
|
|
|
|
|
|
|
|
namespace Toalett\Multiprocessing;
|
|
|
|
|
|
|
|
use Evenement\EventEmitterInterface;
|
|
|
|
use Evenement\EventEmitterTrait;
|
|
|
|
use React\EventLoop\LoopInterface;
|
2020-12-12 02:11:05 +01:00
|
|
|
use Toalett\Multiprocessing\Task\Interval;
|
|
|
|
use Toalett\Multiprocessing\Task\RepeatedTask;
|
|
|
|
use Toalett\Multiprocessing\Task\Tasks;
|
2020-12-11 01:25:38 +01:00
|
|
|
|
|
|
|
class Context implements EventEmitterInterface
|
|
|
|
{
|
2020-12-12 13:05:48 +01:00
|
|
|
public const INTERVAL_GC = 120;
|
|
|
|
public const INTERVAL_CLEANUP = 5;
|
|
|
|
use EventEmitterTrait;
|
2020-12-11 01:25:38 +01:00
|
|
|
|
2020-12-12 13:05:48 +01:00
|
|
|
private LoopInterface $eventLoop;
|
|
|
|
private Concurrency $concurrency;
|
|
|
|
private Workers $workers;
|
|
|
|
private Tasks $maintenanceTasks;
|
2020-12-11 01:25:38 +01:00
|
|
|
|
2020-12-12 13:05:48 +01:00
|
|
|
public function __construct(
|
|
|
|
LoopInterface $eventLoop,
|
|
|
|
Concurrency $concurrency,
|
|
|
|
?Workers $workers = null,
|
|
|
|
?Interval $cleanupInterval = null
|
|
|
|
)
|
|
|
|
{
|
|
|
|
$this->eventLoop = $eventLoop;
|
|
|
|
$this->concurrency = $concurrency;
|
|
|
|
$this->workers = $workers ?? new Workers();
|
|
|
|
$this->setupWorkerEventForwarding();
|
|
|
|
$this->setupMaintenanceTasks($cleanupInterval);
|
|
|
|
}
|
2020-12-11 01:25:38 +01:00
|
|
|
|
2020-12-12 13:05:48 +01:00
|
|
|
public function run(): void
|
|
|
|
{
|
|
|
|
$this->eventLoop->futureTick(fn() => $this->emit('booted'));
|
|
|
|
$this->eventLoop->futureTick(fn() => gc_enable());
|
|
|
|
$this->maintenanceTasks->enable($this->eventLoop);
|
|
|
|
$this->eventLoop->run();
|
|
|
|
}
|
2020-12-11 01:25:38 +01:00
|
|
|
|
2020-12-12 13:05:48 +01:00
|
|
|
public function submit(callable $task, ...$args): void
|
|
|
|
{
|
|
|
|
$this->eventLoop->futureTick(function () use ($task, $args) {
|
|
|
|
if ($this->concurrency->isReachedBy(count($this->workers))) {
|
|
|
|
$this->emit('congestion');
|
|
|
|
$this->workers->awaitCongestionRelief();
|
|
|
|
$this->emit('congestion_relieved');
|
|
|
|
}
|
|
|
|
$this->workers->createWorkerFor($task, $args);
|
|
|
|
});
|
|
|
|
}
|
2020-12-11 01:25:38 +01:00
|
|
|
|
2020-12-12 13:05:48 +01:00
|
|
|
public function stop(): void
|
|
|
|
{
|
|
|
|
$this->maintenanceTasks->cancel();
|
|
|
|
$this->workers->stop();
|
|
|
|
$this->emit('stopped');
|
|
|
|
}
|
2020-12-11 01:25:38 +01:00
|
|
|
|
2020-12-12 13:05:48 +01:00
|
|
|
private function setupWorkerEventForwarding(): void
|
|
|
|
{
|
|
|
|
$this->workers->on('worker_started', fn(int $pid) => $this->emit('worker_started', [$pid]));
|
|
|
|
$this->workers->on('worker_stopped', fn(int $pid) => $this->emit('worker_stopped', [$pid]));
|
|
|
|
$this->workers->on('no_workers_remaining', fn() => $this->emit('no_workers_remaining'));
|
|
|
|
}
|
2020-12-11 01:25:38 +01:00
|
|
|
|
2020-12-12 13:05:48 +01:00
|
|
|
private function setupMaintenanceTasks(?Interval $cleanupInterval): void
|
|
|
|
{
|
|
|
|
$cleanupInterval = $cleanupInterval ?? Interval::seconds(self::INTERVAL_CLEANUP);
|
|
|
|
$gcInterval = Interval::seconds(self::INTERVAL_GC);
|
|
|
|
$this->maintenanceTasks = new Tasks(
|
|
|
|
new RepeatedTask($cleanupInterval, [$this->workers, 'cleanup']),
|
|
|
|
new RepeatedTask($gcInterval, 'gc_collect_cycles')
|
|
|
|
);
|
|
|
|
}
|
2020-12-11 01:25:38 +01:00
|
|
|
}
|