Made the code more robust.

Extended functionality with helper functions.
This commit is contained in:
Joop Schilder 2019-01-16 16:22:02 +01:00
parent 1466c2eaef
commit c0cfaadcc3
8 changed files with 372 additions and 93 deletions

View File

@ -3,33 +3,42 @@
require_once __DIR__ . '/../vendor/autoload.php'; require_once __DIR__ . '/../vendor/autoload.php';
use function Joop\Asynchronous\async;
use Joop\Asynchronous\Promise;
// Create a function we want to run asynchronously /**
$process = function ($i) { * @param Promise[] $promises
$delayMicroseconds = (10 - $i) * 1000000; */
usleep($delayMicroseconds); function awaitPromises(array &$promises)
{
return getmypid();
};
// Execute the functions asynchronously - each returning a Promise
$promises = [];
foreach (range(0, 10) as $i)
$promises[] = Asynchronous::run($process, $i);
// Wait for all promises to resolve
while (count($promises) > 0) {
foreach ($promises as $index => $promise) { foreach ($promises as $index => $promise) {
if ($promise->isResolved() && !$promise->isEmpty()) { if ($promise->isResolved()) {
print("Response retrieved: " . $promise->getValue() . PHP_EOL);
unset($promises[$index]); unset($promises[$index]);
if (!$promise->isEmpty() && !$promise->isError())
print($promise->getValue() . PHP_EOL);
} }
} }
} }
exit(0); /*
* Example of asynchronous processing in PHP
*/
$process = function ($i) {
$delayMicroseconds = (5 - $i) * 1000000;
usleep($delayMicroseconds);
return sprintf(
'PID %-5d slept for %.1f seconds',
getmypid(), $delayMicroseconds / 1000000
);
};
$promises = [];
foreach (range(0, 5) as $i)
$promises[] = async($process, $i);
while (count($promises) > 0)
awaitPromises($promises);

15
bin/example/arrays.php Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/php
<?php
use function Joop\Asynchronous\async;
require_once __DIR__ . '/../../vendor/autoload.php';
$promise = async(function () {
sleep(1);
return range(random_int(0, 10), random_int(20, 60));
});
$array = $promise->resolve()->getValue();
var_dump($array);

37
bin/example/objects.php Executable file
View File

@ -0,0 +1,37 @@
#!/usr/bin/php
<?php
use function Joop\Asynchronous\async;
require_once __DIR__ . '/../../vendor/autoload.php';
// Example class
class Sample
{
private $data;
public function __construct()
{
$this->data = [1, 2, 3];
}
public function getData()
{
return $this->data;
}
}
// Create the process
$promise = async(function () {
sleep(2);
return new Sample();
});
// We can do some other stuff here while the process is running
// Resolve the promise
/** @var Sample $sample */
$sample = $promise->resolve()->getValue();
var_dump($sample->getData());

14
bin/example/sample.php Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/php
<?php
use function Joop\Asynchronous\async;
require_once __DIR__ . '/../../vendor/autoload.php';
$process = function ($number) {
sleep($number);
return $number;
};
async($process, 2);
// Do stuff...

View File

@ -7,13 +7,13 @@
} }
], ],
"autoload": { "autoload": {
"psr-0": { "files": ["src/functions.php"],
"": ["src"] "psr-4": {
"Joop\\Asynchronous\\": ["src"]
} }
}, },
"require": { "require": {
"ext-pcntl": "*", "ext-pcntl": "*",
"ext-curl": "*",
"ext-sysvshm": "*" "ext-sysvshm": "*"
} }
} }

View File

@ -1,19 +1,23 @@
<?php <?php
namespace Joop\Asynchronous;
/** /**
* Class Asynchronous * Class Asynchronous
* Responsible for management of child processes and shared memory.
*/ */
class Asynchronous class Asynchronous
{ {
/** @var Asynchronous|null */
private static $instance; private static $instance;
/** @var int */
private static $key = 0;
/** @var bool */ /** @var bool */
private $isChild = false; private $isChild = false;
private static $key = 0;
/** @var int[] */ /** @var int[] */
private $children = []; private $children = [];
@ -23,41 +27,6 @@ class Asynchronous
/** @var int */ /** @var int */
private $shmKey; private $shmKey;
/** @var string */
private $tempFile;
/**
* Asynchronous constructor.
*/
private function __construct()
{
$this->tempFile = tempnam(__DIR__ . '/../temp', 'PHP');
$this->shmKey = ftok($this->tempFile, 'a');
Promise::_setShmKey($this->shmKey);
$this->attach();
}
/**
* @return $this
*/
private function attach()
{
$this->shm = shm_attach($this->shmKey);
return $this;
}
/**
* @return Asynchronous
*/
private static function getInstance()
{
if (is_null(self::$instance))
self::$instance = new static();
return self::$instance;
}
/** /**
* @param callable $function * @param callable $function
@ -67,41 +36,142 @@ class Asynchronous
public static function run(callable $function, ...$parameters) public static function run(callable $function, ...$parameters)
{ {
$instance = self::getInstance(); $instance = self::getInstance();
$key = self::generatePromiseKey();
$pid = pcntl_fork(); $pid = pcntl_fork();
if ($pid === false) /*
* The fork failed. Instead of returning a promise, we return null.
*/
if ($pid == -1)
return null; return null;
$key = self::generatePromiseKey(); /*
* Parent process. We keep track of the PID of the child process
* in order for us to read out it's status later on.
* A Promise instance is returned that corresponds to the key in
* memory to which the child process will write sometime.
*/
if ($pid > 0) { if ($pid > 0) {
$instance->children[] = $pid; $instance->children[] = $pid;
return new Promise($key); return new Promise($key);
} }
/*
* Child process. Mark the (copied) instance of this class as a child
* to prevent unneeded shutdown handler execution.
* Reattach to the shared memory block (the $shm member variable is a
* resource since PHP > 5.3 and is thus not shared with the child)
* and execute the function.
* On a successful execution, write the result to the shared memory
* block to which the parent is attached.
* On failure, write a default response to the block in order for
* the Promise to be able to resolve.
*/
$instance->isChild = true; $instance->isChild = true;
$instance->attach(); $instance->attachShm();
try { try {
$response = call_user_func($function, ...$parameters); $response = call_user_func_array($function, $parameters);
shm_put_var($instance->shm, $key, $response ?? Promise::RESPONSE_NONE); shm_put_var($instance->shm, $key, $response ?? Promise::RESPONSE_NONE);
exit(0); exit(0);
} catch (Throwable $throwable) { } catch (\Throwable $throwable) {
shm_put_var($instance->shm, $key, Promise::RESPONSE_ERROR);
exit(1); exit(1);
} }
} }
/**
*
*/
public static function cleanup()
{
/*
* Iterate over all child process PIDs and check
* if one or more of them has stopped.
*/
$instance = self::getInstance();
foreach ($instance->children as $index => $pid) {
$response = pcntl_waitpid($pid, $status, WNOHANG);
if ($response === $pid)
unset($instance->children[$index]);
}
}
/**
* @return int
*/
public static function childCount()
{
return count(self::getInstance()->children);
}
/*
* Private methods below
*/
/**
* Asynchronous constructor.
*/
private function __construct()
{
/*
* Use the filename as an identifier to create the
* System V IPC key.
*/
$this->shmKey = ftok(__FILE__, 't');
Promise::__setShmKey($this->shmKey);
$this->attachShm();
}
/**
* @return $this
*/
private function attachShm()
{
$this->shm = shm_attach($this->shmKey);
return $this;
}
/**
* @return Asynchronous
*/
private static function getInstance()
{
if (is_null(self::$instance)) {
/*
* This is executed once during runtime;
* when a functionality from this class
* is used for the first time.
*/
self::$instance = new static();
self::registerHandlers();
}
return self::$instance;
}
/** /**
* @return int * @return int
*/ */
private static function generatePromiseKey() private static function generatePromiseKey()
{ {
/*
* Get the current key.
*/
$promiseKey = self::$key; $promiseKey = self::$key;
self::$key++;
if (self::$key > 9999999) /*
self::$key = 0; * Reset the key to 0 if the upper bound of
* 9.999.999 is reached (Windows limit for
* shm keys).
*/
self::$key = (++self::$key > 9999999) ? 0 : self::$key;
return $promiseKey; return $promiseKey;
} }
@ -110,18 +180,48 @@ class Asynchronous
/** /**
* *
*/ */
public function __destruct() private static function registerHandlers()
{ {
if ($this->isChild) $instance = self::getInstance();
return;
while (count($this->children) > 0) { /*
pcntl_wait($status); * The shutdown handler
array_shift($this->children); */
} $handler = function () use (&$instance) {
shm_remove($this->shm); /*
shm_detach($this->shm); * A child process has no business here.
unlink($this->tempFile); */
if ($instance->isChild)
return;
/*
* Wait for all children to finish to
* ensure that all writing to the shared
* memory block is finished.
*/
while (count($instance->children) > 0) {
pcntl_wait($status);
array_shift($instance->children);
}
/*
* Ask the kernel to mark the shared memory
* block for removal and detach from it to
* actually allow for removal.
*/
if (is_resource($instance->shm)) {
shm_remove($instance->shm);
shm_detach($instance->shm);
}
};
/*
* Actually register the handler as shutdown
* handler and signal handler for SIGINT, SIGTERM
*/
register_shutdown_function($handler);
foreach ([SIGINT, SIGTERM] as $SIGNAL)
pcntl_signal($SIGNAL, $handler);
} }
} }

View File

@ -1,22 +1,46 @@
<?php <?php
namespace Joop\Asynchronous;
class Promise class Promise
{ {
/*
* Define some default responses that will make it easy for us
* to check if the promise resulted in an error or if the promise
* was fulfilled by a void function.
* The '_' characters are arbitrary but ensure a higher entropy to
* minimize the chances of result collision.
*/
public const RESPONSE_NONE = '__PROMISE_RESPONSE_NONE__'; public const RESPONSE_NONE = '__PROMISE_RESPONSE_NONE__';
public const RESPONSE_ERROR = '__PROMISE_RESPONSE_ERROR__';
private $shm;
private $key;
private $value;
/** @var int */ /** @var int */
private static $shmKey; private static $shmKey;
public static function _setShmKey(int $shmKey)
/** @var resource */
private $shm;
/** @var int */
private $key;
/** @var mixed|null */
private $value;
/**
* @param int $shmKey
*/
public static function __setShmKey(int $shmKey)
{ {
/*
* Should be done only once: when the Asynchronous class
* has created a key that will be used for IPC.
*/
self::$shmKey = $shmKey; self::$shmKey = $shmKey;
} }
/** /**
* Promise constructor. * Promise constructor.
* @param int $key * @param int $key
@ -28,45 +52,96 @@ class Promise
$this->shm = shm_attach(self::$shmKey); $this->shm = shm_attach(self::$shmKey);
} }
/**
* @return bool
*/
public function shmValid()
{
return is_resource($this->shm);
}
/** /**
* @return bool * @return bool
*/ */
public function isResolved() public function isResolved()
{ {
return shm_has_var($this->shm, $this->key); if ($this->shmValid())
return shm_has_var($this->shm, $this->key);
return true;
} }
/** /**
* @return bool * @return bool
*/ */
public function isEmpty() public function isEmpty()
{ {
return $this->getValue() === self::RESPONSE_NONE; $value = $this->getValue();
return $value === self::RESPONSE_NONE || $value === null;
} }
/**
* @return bool
*/
public function isError()
{
return $this->getValue() === self::RESPONSE_ERROR;
}
/** /**
* @return mixed|null * @return mixed|null
*/ */
public function getValue() public function getValue()
{ {
return $this->isResolved() ? $this->resolve()->value : null; if ($this->shmValid())
return $this->isResolved() ? $this->resolve()->value : null;
$this->value = self::RESPONSE_ERROR;
return $this->value;
} }
/** /**
* @return $this * @return $this
*/ */
public function resolve() public function resolve()
{ {
/*
* Actually block execution until a value is written to
* the expected location of this Promise.
*/
while (!$this->isResolved()) while (!$this->isResolved())
usleep(1000); // 1ms usleep(1000);
if (is_null($this->value) && $this->shmValid())
$this->value = shm_get_var($this->shm, $this->key);
$this->value = shm_get_var($this->shm, $this->key);
return $this; return $this;
} }
/**
*
*/
public function __destruct() public function __destruct()
{ {
if (is_resource($this->shm)) /*
* Clean up our mess - the variable that we stored in the
* shared memory block - and detach from the block.
* Note: this destructor is only called after the
* garbage collector has noticed that there are no more
* references to this Promise instance.
*/
if ($this->shmValid()) {
if (shm_has_var($this->shm, $this->key))
shm_remove_var($this->shm, $this->key);
shm_detach($this->shm); shm_detach($this->shm);
}
} }
} }

29
src/functions.php Normal file
View File

@ -0,0 +1,29 @@
<?php
namespace Joop\Asynchronous;
require_once __DIR__ . '/../vendor/autoload.php';
if (!function_exists('async_run')) {
/**
* @param callable $function
* @param mixed ...$parameters
* @return Promise|null
*/
function async(callable $function, ...$parameters)
{
return Asynchronous::run($function, ...$parameters);
}
}
if (!function_exists('async_cleanup')) {
/**
*
*/
function async_cleanup()
{
Asynchronous::cleanup();
}
}