Actually stable version.

This commit is contained in:
Joop Schilder 2019-01-17 00:49:53 +01:00
parent 826553a24e
commit be444f3b45
6 changed files with 195 additions and 61 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
/bin/
/vendor/ /vendor/
composer.lock composer.lock
/.idea/ /.idea/

View File

@ -1,4 +1,74 @@
# php-async # joopschilder/php-async
Asynchronous PHP callable processing with return values via SysV shared memory.<br/> Asynchronous PHP callable processing with return values via SysV shared memory.<br/>
Requires the `php-sysvshm` extension.<br/> Requires the `php-sysvshm` extension.<br/>
Works with PHP from version 5.3 upwards due to `shm_attach(...)` returning a resource instead of an int. Works with PHP >= 5.3 due to `shm_attach(...)` returning a resource instead of an int.<br/>
<br/>
<b>Note:</b> This project is merely an experiment. It is, however, available on packagist.
If you think your project lacks witchcraft combined with black magic, just add this package to your `composer.json`:
```json
{
"require": {
"joopschilder/php-async": "dev-master"
}
}
```
## Examples
### Promises
You can actually return anything that is serializable in PHP: objects, arrays, strings, you name it.
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
$promise = async(function() {
sleep(random_int(1, 5));
return getmypid();
});
// ... some other work
$promise->resolve();
$pid = $promise->getValue();
printf("Me (%d) and %d have worked very hard!\n", getmypid(), $pid);
```
The shutdown handler and destructors should take care of the rest.
### Asynchronous curl requests
... though you should probably look into curl multi handles for this: <a href="http://php.net/manual/en/function.curl-multi-init.php">curl_multi_init() on PHP.net</a>.
```php
<?php
require_once __DIR__ . '/vendor/autoload.php';
// Create the body for the process
$process = function(string $url) {
$handle = curl_init($url);
curl_setopt($handle, CURLOPT_FOLLOWLOCATION, 1);
curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1);
$response = curl_exec($handle);
curl_close($handle);
file_put_contents(uniqid('download_'), $response);
};
// Define some urls we want to download
$urls = [
'example.com',
'example2.com',
'some.other.domain'
];
// And away we go!
foreach($urls as $url)
async($process, $url);
```
That's all there is to it.
## What's next?
- Refactoring
- More functionality (maybe)
- Improving stability
- Add a diagram that explains this witchcraft

View File

@ -8,6 +8,9 @@ namespace JoopSchilder\Asynchronous;
*/ */
class Asynchronous class Asynchronous
{ {
public const BLOCK_SIZE_MB = 32;
private const BLOCK_SIZE_BYTES = self::BLOCK_SIZE_MB * (1024 ** 2);
/** @var Asynchronous|null */ /** @var Asynchronous|null */
private static $instance; private static $instance;
@ -15,9 +18,6 @@ class Asynchronous
private static $key = 0; private static $key = 0;
/** @var bool */
private $isChild = false;
/** @var int[] */ /** @var int[] */
private $children = []; private $children = [];
@ -35,8 +35,15 @@ class Asynchronous
*/ */
public static function run(callable $function, ...$parameters) public static function run(callable $function, ...$parameters)
{ {
/*
* Prepare for fork
*/
$instance = self::getInstance(); $instance = self::getInstance();
$key = self::generatePromiseKey(); $key = self::generatePromiseKey();
/*
* Fork the parent
*/
$pid = pcntl_fork(); $pid = pcntl_fork();
/* /*
@ -68,18 +75,16 @@ class Asynchronous
* On failure, write a default response to the block in order for * On failure, write a default response to the block in order for
* the Promise to be able to resolve. * the Promise to be able to resolve.
*/ */
$instance->isChild = true; Runtime::setChild();
$instance->attachShm(); $instance->_attachToShm();
try { try {
$response = call_user_func_array($function, $parameters); $response = call_user_func($function, ...$parameters);
if (is_resource($instance->shm))
shm_put_var($instance->shm, $key, $response ?? Promise::RESPONSE_NONE); shm_put_var($instance->shm, $key, $response ?? Promise::RESPONSE_NONE);
exit(0); exit(0);
} catch (\Throwable $throwable) { } catch (\Throwable $throwable) {
if (is_resource($instance->shm))
shm_put_var($instance->shm, $key, Promise::RESPONSE_ERROR); shm_put_var($instance->shm, $key, Promise::RESPONSE_ERROR);
exit(1); exit(1);
@ -89,7 +94,7 @@ class Asynchronous
/** /**
* *
*/ */
public static function cleanup() public static function reap()
{ {
/* /*
* Iterate over all child process PIDs and check * Iterate over all child process PIDs and check
@ -106,7 +111,7 @@ class Asynchronous
/** /**
* *
*/ */
public static function awaitChildren() public static function waitForChildren()
{ {
self::getInstance()->_awaitChildren(); self::getInstance()->_awaitChildren();
} }
@ -142,9 +147,11 @@ class Asynchronous
* Use the filename as an identifier to create the * Use the filename as an identifier to create the
* System V IPC key. * System V IPC key.
*/ */
if ($this->shmKey == null)
$this->shmKey = ftok(__FILE__, 't'); $this->shmKey = ftok(__FILE__, 't');
Promise::__setShmKey($this->shmKey); Promise::__setShmKey($this->shmKey);
$this->attachShm(); $this->_attachToShm();
} }
/** /**
@ -179,12 +186,13 @@ class Asynchronous
return $this; return $this;
} }
/** /**
* @return $this * @return $this
*/ */
private function attachShm() private function _attachToShm()
{ {
$this->shm = shm_attach($this->shmKey); $this->shm = shm_attach($this->shmKey, self::BLOCK_SIZE_BYTES);
return $this; return $this;
} }
@ -224,7 +232,9 @@ class Asynchronous
* 9.999.999 is reached (Windows limit for * 9.999.999 is reached (Windows limit for
* shm keys). * shm keys).
*/ */
self::$key = (++self::$key > 9999999) ? 0 : self::$key; self::$key++;
if (self::$key > 99999999)
self::$key = 0;
return $promiseKey; return $promiseKey;
} }
@ -241,7 +251,7 @@ class Asynchronous
* The shutdown handler * The shutdown handler
*/ */
register_shutdown_function(function () use (&$instance) { register_shutdown_function(function () use (&$instance) {
if ($instance->isChild) if (Runtime::isChild())
return; return;
$instance->_awaitChildren(); $instance->_awaitChildren();
@ -249,12 +259,16 @@ class Asynchronous
}); });
} }
/**
*
*/
public function __destruct() public function __destruct()
{ {
/* if (Runtime::isChild())
* To be sure - add destructor return;
*/
self::removeShmBlock(); $instance = self::getInstance();
$instance->_removeShmBlock();
} }
} }

View File

@ -27,7 +27,6 @@ class Promise
/** @var mixed|null */ /** @var mixed|null */
private $value; private $value;
/** /**
* @param int $shmKey * @param int $shmKey
*/ */
@ -53,11 +52,14 @@ class Promise
} }
/** /**
* @return bool * @return $this
*/ */
public function shmValid() private function attempt()
{ {
return is_resource($this->shm); if (shm_has_var($this->shm, $this->key))
$this->value = shm_get_var($this->shm, $this->key);
return $this;
} }
/** /**
@ -65,21 +67,18 @@ class Promise
*/ */
public function isResolved() public function isResolved()
{ {
if ($this->shmValid()) $this->attempt();
return shm_has_var($this->shm, $this->key);
return true; return !is_null($this->value);
} }
/** /**
* @return bool * @return bool
*/ */
public function isEmpty() public function isVoid()
{ {
$value = $this->getValue(); return $this->getValue() === self::RESPONSE_NONE;
return $value === self::RESPONSE_NONE || $value === null;
} }
@ -97,12 +96,7 @@ class Promise
*/ */
public function getValue() public function getValue()
{ {
if ($this->shmValid()) return $this->isResolved() ? $this->value : null;
return $this->isResolved() ? $this->resolve()->value : null;
$this->value = self::RESPONSE_ERROR;
return $this->value;
} }
@ -113,14 +107,11 @@ class Promise
{ {
/* /*
* Actually block execution until a value is written to * Actually block execution until a value is written to
* the expected location of this Promise. * the expected memory location of this Promise.
*/ */
while (!$this->isResolved()) while (!$this->isResolved())
usleep(1000); usleep(1000);
if (is_null($this->value) && $this->shmValid())
$this->value = shm_get_var($this->shm, $this->key);
return $this; return $this;
} }
@ -137,11 +128,13 @@ class Promise
* garbage collector has noticed that there are no more * garbage collector has noticed that there are no more
* references to this Promise instance. * references to this Promise instance.
*/ */
if ($this->shmValid()) { if (Runtime::isChild())
return;
if (shm_has_var($this->shm, $this->key)) if (shm_has_var($this->shm, $this->key))
shm_remove_var($this->shm, $this->key); shm_remove_var($this->shm, $this->key);
shm_detach($this->shm); shm_detach($this->shm);
}
} }
} }

55
src/Runtime.php Normal file
View File

@ -0,0 +1,55 @@
<?php
namespace JoopSchilder\Asynchronous;
/**
* Class Runtime
* @package JoopSchilder\Asynchronous
*
* Created to keep track of the current runtime situation.
* This is critical in order for the application to know
* which destructors and handlers to call.
*/
class Runtime
{
/** @var bool */
private static $inParentRuntime = true;
/**
*
*/
public static function setChild()
{
self::$inParentRuntime = false;
}
/**
*
*/
public static function setParent()
{
self::$inParentRuntime = true;
}
/**
* @return bool
*/
public static function isParent()
{
return self::$inParentRuntime;
}
/**
* @return bool
*/
public static function isChild()
{
return !self::$inParentRuntime;
}
}

View File

@ -1,14 +1,13 @@
<?php <?php
use JoopSchilder\Asynchronous\Asynchronous;
use JoopSchilder\Asynchronous\Promise;
namespace JoopSchilder\Asynchronous; if (!function_exists('async')) {
if (!function_exists('async_run')) {
/** /**
* @param callable $function * @param callable $function
* @param mixed ...$parameters * @param mixed ...$parameters
* @return Promise|null * @return Promise
*/ */
function async(callable $function, ...$parameters) function async(callable $function, ...$parameters)
{ {
@ -17,23 +16,25 @@ if (!function_exists('async_run')) {
} }
if (!function_exists('async_cleanup')) { if (!function_exists('async_reap_zombies')) {
/** /**
* *
*/ */
function async_cleanup() function async_reap_zombies()
{ {
Asynchronous::cleanup(); Asynchronous::reap();
} }
} }
if (!function_exists('async_await_all')) { if (!function_exists('async_wait_all')) {
/** /**
* *
*/ */
function async_await_all() function async_wait_all()
{ {
Asynchronous::awaitChildren(); Asynchronous::waitForChildren();
} }
} }