diff --git a/bin/app.php b/bin/app.php index 6f1a672..82b35ff 100755 --- a/bin/app.php +++ b/bin/app.php @@ -3,33 +3,42 @@ require_once __DIR__ . '/../vendor/autoload.php'; +use function Joop\Asynchronous\async; +use Joop\Asynchronous\Promise; -// Create a function we want to run asynchronously -$process = function ($i) { - $delayMicroseconds = (10 - $i) * 1000000; - usleep($delayMicroseconds); - - return getmypid(); -}; - - -// Execute the functions asynchronously - each returning a Promise -$promises = []; -foreach (range(0, 10) as $i) - $promises[] = Asynchronous::run($process, $i); - - -// Wait for all promises to resolve -while (count($promises) > 0) { +/** + * @param Promise[] $promises + */ +function awaitPromises(array &$promises) +{ foreach ($promises as $index => $promise) { - if ($promise->isResolved() && !$promise->isEmpty()) { - print("Response retrieved: " . $promise->getValue() . PHP_EOL); + if ($promise->isResolved()) { unset($promises[$index]); + + if (!$promise->isEmpty() && !$promise->isError()) + print($promise->getValue() . PHP_EOL); } } } -exit(0); +/* + * Example of asynchronous processing in PHP + */ +$process = function ($i) { + $delayMicroseconds = (5 - $i) * 1000000; + usleep($delayMicroseconds); + return sprintf( + 'PID %-5d slept for %.1f seconds', + getmypid(), $delayMicroseconds / 1000000 + ); +}; + +$promises = []; +foreach (range(0, 5) as $i) + $promises[] = async($process, $i); + +while (count($promises) > 0) + awaitPromises($promises); diff --git a/bin/example/arrays.php b/bin/example/arrays.php new file mode 100755 index 0000000..efb393a --- /dev/null +++ b/bin/example/arrays.php @@ -0,0 +1,15 @@ +#!/usr/bin/php +resolve()->getValue(); +var_dump($array); diff --git a/bin/example/objects.php b/bin/example/objects.php new file mode 100755 index 0000000..0833d0d --- /dev/null +++ b/bin/example/objects.php @@ -0,0 +1,37 @@ +#!/usr/bin/php +data = [1, 2, 3]; + } + + public function getData() + { + return $this->data; + } +} + +// Create the process +$promise = async(function () { + sleep(2); + + return new Sample(); +}); + +// We can do some other stuff here while the process is running + +// Resolve the promise +/** @var Sample $sample */ +$sample = $promise->resolve()->getValue(); +var_dump($sample->getData()); diff --git a/bin/example/sample.php b/bin/example/sample.php new file mode 100755 index 0000000..ab35bf8 --- /dev/null +++ b/bin/example/sample.php @@ -0,0 +1,14 @@ +#!/usr/bin/php +tempFile = tempnam(__DIR__ . '/../temp', 'PHP'); - $this->shmKey = ftok($this->tempFile, 'a'); - Promise::_setShmKey($this->shmKey); - $this->attach(); - } - - /** - * @return $this - */ - private function attach() - { - $this->shm = shm_attach($this->shmKey); - - return $this; - } - - /** - * @return Asynchronous - */ - private static function getInstance() - { - if (is_null(self::$instance)) - self::$instance = new static(); - - return self::$instance; - } - /** * @param callable $function @@ -67,41 +36,142 @@ class Asynchronous public static function run(callable $function, ...$parameters) { $instance = self::getInstance(); + $key = self::generatePromiseKey(); $pid = pcntl_fork(); - if ($pid === false) + /* + * The fork failed. Instead of returning a promise, we return null. + */ + if ($pid == -1) return null; - $key = self::generatePromiseKey(); - + /* + * Parent process. We keep track of the PID of the child process + * in order for us to read out it's status later on. + * A Promise instance is returned that corresponds to the key in + * memory to which the child process will write sometime. + */ if ($pid > 0) { $instance->children[] = $pid; return new Promise($key); } + /* + * Child process. Mark the (copied) instance of this class as a child + * to prevent unneeded shutdown handler execution. + * Reattach to the shared memory block (the $shm member variable is a + * resource since PHP > 5.3 and is thus not shared with the child) + * and execute the function. + * On a successful execution, write the result to the shared memory + * block to which the parent is attached. + * On failure, write a default response to the block in order for + * the Promise to be able to resolve. + */ $instance->isChild = true; - $instance->attach(); + $instance->attachShm(); try { - $response = call_user_func($function, ...$parameters); + $response = call_user_func_array($function, $parameters); shm_put_var($instance->shm, $key, $response ?? Promise::RESPONSE_NONE); exit(0); - } catch (Throwable $throwable) { + } catch (\Throwable $throwable) { + shm_put_var($instance->shm, $key, Promise::RESPONSE_ERROR); exit(1); } - - } + /** + * + */ + public static function cleanup() + { + /* + * Iterate over all child process PIDs and check + * if one or more of them has stopped. + */ + $instance = self::getInstance(); + foreach ($instance->children as $index => $pid) { + $response = pcntl_waitpid($pid, $status, WNOHANG); + if ($response === $pid) + unset($instance->children[$index]); + } + } + + /** + * @return int + */ + public static function childCount() + { + return count(self::getInstance()->children); + } + + + + /* + * Private methods below + */ + + /** + * Asynchronous constructor. + */ + private function __construct() + { + /* + * Use the filename as an identifier to create the + * System V IPC key. + */ + $this->shmKey = ftok(__FILE__, 't'); + Promise::__setShmKey($this->shmKey); + $this->attachShm(); + } + + + /** + * @return $this + */ + private function attachShm() + { + $this->shm = shm_attach($this->shmKey); + + return $this; + } + + + /** + * @return Asynchronous + */ + private static function getInstance() + { + if (is_null(self::$instance)) { + /* + * This is executed once during runtime; + * when a functionality from this class + * is used for the first time. + */ + self::$instance = new static(); + self::registerHandlers(); + } + + return self::$instance; + } + + /** * @return int */ private static function generatePromiseKey() { + /* + * Get the current key. + */ $promiseKey = self::$key; - self::$key++; - if (self::$key > 9999999) - self::$key = 0; + + /* + * Reset the key to 0 if the upper bound of + * 9.999.999 is reached (Windows limit for + * shm keys). + */ + self::$key = (++self::$key > 9999999) ? 0 : self::$key; return $promiseKey; } @@ -110,18 +180,48 @@ class Asynchronous /** * */ - public function __destruct() + private static function registerHandlers() { - if ($this->isChild) - return; + $instance = self::getInstance(); - while (count($this->children) > 0) { - pcntl_wait($status); - array_shift($this->children); - } - shm_remove($this->shm); - shm_detach($this->shm); - unlink($this->tempFile); + /* + * The shutdown handler + */ + $handler = function () use (&$instance) { + /* + * A child process has no business here. + */ + if ($instance->isChild) + return; + + /* + * Wait for all children to finish to + * ensure that all writing to the shared + * memory block is finished. + */ + while (count($instance->children) > 0) { + pcntl_wait($status); + array_shift($instance->children); + } + + /* + * Ask the kernel to mark the shared memory + * block for removal and detach from it to + * actually allow for removal. + */ + if (is_resource($instance->shm)) { + shm_remove($instance->shm); + shm_detach($instance->shm); + } + }; + + /* + * Actually register the handler as shutdown + * handler and signal handler for SIGINT, SIGTERM + */ + register_shutdown_function($handler); + foreach ([SIGINT, SIGTERM] as $SIGNAL) + pcntl_signal($SIGNAL, $handler); } } \ No newline at end of file diff --git a/src/Promise.php b/src/Promise.php index ba4be81..e477453 100644 --- a/src/Promise.php +++ b/src/Promise.php @@ -1,22 +1,46 @@ shm = shm_attach(self::$shmKey); } + /** + * @return bool + */ + public function shmValid() + { + return is_resource($this->shm); + } + /** * @return bool */ public function isResolved() { - return shm_has_var($this->shm, $this->key); + if ($this->shmValid()) + return shm_has_var($this->shm, $this->key); + + return true; } + /** * @return bool */ public function isEmpty() { - return $this->getValue() === self::RESPONSE_NONE; + $value = $this->getValue(); + + return $value === self::RESPONSE_NONE || $value === null; } + + /** + * @return bool + */ + public function isError() + { + return $this->getValue() === self::RESPONSE_ERROR; + } + + /** * @return mixed|null */ public function getValue() { - return $this->isResolved() ? $this->resolve()->value : null; + if ($this->shmValid()) + return $this->isResolved() ? $this->resolve()->value : null; + + $this->value = self::RESPONSE_ERROR; + + return $this->value; } + /** * @return $this */ public function resolve() { + /* + * Actually block execution until a value is written to + * the expected location of this Promise. + */ while (!$this->isResolved()) - usleep(1000); // 1ms + usleep(1000); + + if (is_null($this->value) && $this->shmValid()) + $this->value = shm_get_var($this->shm, $this->key); - $this->value = shm_get_var($this->shm, $this->key); return $this; } + + /** + * + */ public function __destruct() { - if (is_resource($this->shm)) + /* + * Clean up our mess - the variable that we stored in the + * shared memory block - and detach from the block. + * Note: this destructor is only called after the + * garbage collector has noticed that there are no more + * references to this Promise instance. + */ + if ($this->shmValid()) { + if (shm_has_var($this->shm, $this->key)) + shm_remove_var($this->shm, $this->key); + shm_detach($this->shm); + } } } \ No newline at end of file diff --git a/src/functions.php b/src/functions.php new file mode 100644 index 0000000..86a13c9 --- /dev/null +++ b/src/functions.php @@ -0,0 +1,29 @@ +