source = $source; $this->loop = $loop; $this->pollingInterval = $pollingInterval; $this->resume(); } public function isReadable(): bool { return !$this->closed; } public function pause(): void { if (!$this->listening) { return; } $this->source->close(); $this->loop->cancelTimer($this->timer); $this->listening = false; } public function resume(): void { if ($this->listening || $this->closed) { return; } $this->source->open(); $this->timer = $this->loop->addPeriodicTimer($this->pollingInterval, function () { while (!is_null($data = $this->read())) { $this->emit('data', [$data]); } if ($this->source->eof()) { $this->emit('end'); $this->close(); } }); $this->listening = true; } public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface { return Util::pipe($this, $dest, $options); } public function close(): void { if ($this->closed) { return; } $this->closed = true; $this->emit('close'); $this->pause(); $this->removeAllListeners(); $this->source->close(); } /** * @return mixed|null */ private function read() { try { return $this->source->select(); } catch (Throwable $t) { $this->emit('error', [new RuntimeException('Unable to read data from source: ' . $t->getMessage(), 0, $t)]); $this->close(); } return null; } }