Updated README and fixed responsibility mixup. Also, added stability by cleaning the memory block on startup.

This commit is contained in:
Joop Schilder 2019-01-22 17:44:32 +01:00
parent a5b5a58d06
commit f29d1a0d7a
5 changed files with 165 additions and 97 deletions

View File

@ -1,52 +1,67 @@
# joopschilder/php-async # joopschilder/php-async
Asynchronous PHP callable processing with return values via SysV shared memory.<br/> ## Introduction
Requires the `php-sysvshm` extension.<br/> This package provides functions to run callables asynchronously in PHP. Return values are shared via System-V shared memory.<br/>
Works with PHP >= 5.3 due to `shm_attach(...)` returning a resource instead of an int.<br/><br/> To use this package, you'll need PHP >= 7.0.0 with `ext-sysvshm` and `ext-pcntl`.<br/>
<b>Note:</b> This package should not be used in a CGI environment, as each PHP runtime that makes a call on the async functions WILL You should consider the state of this package to be <i>experimental</i>.<br/><br/>
create an 8MB shared memory block in RAM. If you run out of memory: you have been warned. <b>Note:</b> This package should not be used in a CGI environment.
<br/> The key that is used to access the block of shared memory is created based on the inode information of one of the source files.
<b>Note:</b> This project is an experiment. It is, however, available on packagist. This means that, whenever multiple instances (processes) from the same project source are created, they will try to use the same block of memory and collisions will occur.
If you think your project lacks witchcraft and black magic, just add this package to your `composer.json`: I might swap the `ftok()` call for a random string generator somewhere down the road.<br/><br/>
<b>Note:</b> It is possible (but discouraged) to change the amount of available shared memory.
If you wish to do so, it's as simple as calling either `Runtime::_setSharedMemorySizeMB(<amount of MB>);` or `Runtime::_setSharedMemorySizeB(<amount of bytes>);`.<br/>
If you want to use 32MB for example, call `Runtime::_setSharedMemorySizeMB(32);`.<br/>
Be sure to make this call before using any of the asynchronous functionalities.
## Installation
This package is available on <a href="https://packagist.org/packages/joopschilder/php-async">Packagist</a>
and can be installed using <a href="https://getcomposer.org/">Composer</a>:<br/>
```bash
$ composer require joopschilder/async-php
```
It's also possible to manually add it to your `composer.json`:
```json ```json
{ {
"require": { "require": {
"joopschilder/php-async": "dev-master" "joopschilder/php-async": "dev-master"
} }
} }
``` ```
## Usage
#### Functions
The library exposes three functions in the global namespace that provide indirect access to the class `Asynchronous`:
* `async(callable $function, ...$parameters)` to run something asynchronously, giving back a `Promise`;
* `async_wait_all()` to wait for all currently running jobs to finish;
* `async_reap_zombies()` to clean up any zombie processes during runtime if any exist;
## Examples #### Promises
Whenever you call `async(...)`, a `Promise` instance is returned.<br/>
### Promises A `Promise` is considered to be resolved when the function it belongs to returned a value or finished execution.
To block execution until a promise is resolved, simply call the `resolve()` method on the promise.
It's possible to check whether the promise has been resolved in a non-blocking way by calling the `isResolved()` method.<br/>
You can actually return anything that is serializable in PHP: objects, arrays, strings, you name it. You can actually return anything that is serializable in PHP: objects, arrays, strings, you name it.
```php ```php
<?php <?php
require_once __DIR__ . '/vendor/autoload.php'; require_once __DIR__ . '/vendor/autoload.php';
$promise = async(function() { $promise = async(function() {
sleep(random_int(1, 5)); sleep(random_int(1, 5));
return getmypid(); return getmypid();
}); });
// ... some other work // ... do some other work
$promise->resolve(); $promise->resolve();
$pid = $promise->getValue(); $pid = $promise->getValue();
printf("Me (%d) and %d have worked very hard!\n", getmypid(), $pid);
``` ```
The shutdown handler and destructors should take care of the rest. The shutdown handler and destructors should take care of the cleanup.<br/>
#### Asynchronous curl requests
... though you should probably look into curl multi handles for this: <a href="http://php.net/manual/en/function.curl-multi-init.php">curl_multi_init()</a>.
### Asynchronous curl requests
... though you should probably look into curl multi handles for this: <a href="http://php.net/manual/en/function.curl-multi-init.php">curl_multi_init() on PHP.net</a>.
```php ```php
<?php <?php
require_once __DIR__ . '/vendor/autoload.php'; require_once __DIR__ . '/vendor/autoload.php';
// Create the body for the process // Create the body for the process...
$process = function(string $url) { $process = function(string $url) {
$handle = curl_init($url); $handle = curl_init($url);
curl_setopt($handle, CURLOPT_FOLLOWLOCATION, 1); curl_setopt($handle, CURLOPT_FOLLOWLOCATION, 1);
@ -56,21 +71,30 @@ $process = function(string $url) {
file_put_contents(uniqid('download_'), $response); file_put_contents(uniqid('download_'), $response);
}; };
// Define some urls we want to download // Define some urls we want to download...
$urls = [ $urls = [
'example.com', 'example.com',
'example2.com', 'example2.com',
'some.other.domain' 'some.other.domain'
]; ];
// And away we go! // And there we go.
foreach($urls as $url) foreach($urls as $url)
async($process, $url); async($process, $url);
``` ```
That's all there is to it. That's all there is to it.
## Tips
If you're on a UNIX system, you can make use of the tools `ipcs` and `ipcrm` to monitor and manage the shared memory blocks.<br/>
To track what's happening in real time, I like to use:<br/>
```bash
$ watch -n 1 "ipcs -m --human && ipcs -m -p && ipcs -m -t && ipcs -m -u"
```
<br/>To clean all 'unused' shared memory blocks (they might remain resident in RAM if your program terminated unexpectedly):<br/>
```bash
$ ipcrm -a
```
## What's next? ## What's next?
- Refactoring
- More functionality (maybe)
- Improving stability - Improving stability
- Add a diagram that explains this witchcraft - Add an explaining diagram

View File

@ -14,7 +14,6 @@
}, },
"require": { "require": {
"ext-pcntl": "*", "ext-pcntl": "*",
"ext-sysvshm": "*", "ext-sysvshm": "*"
"ext-posix": "*"
} }
} }

View File

@ -8,25 +8,15 @@ namespace JoopSchilder\Asynchronous;
*/ */
class Asynchronous class Asynchronous
{ {
public const BLOCK_SIZE_MB = 8;
private const BLOCK_SIZE_BYTES = self::BLOCK_SIZE_MB * (1024 ** 2);
/** @var Asynchronous|null */ /** @var Asynchronous|null */
private static $instance; private static $instance;
/** @var int */
private static $key = 0;
/** @var int[] */ /** @var int[] */
private $children = []; private $children = [];
/** @var resource */ /** @var resource */
private $shm; private $shm;
/** @var int */
private $shmKey;
/** /**
* @param callable $function * @param callable $function
@ -39,7 +29,7 @@ class Asynchronous
* Prepare for fork * Prepare for fork
*/ */
$instance = self::getInstance(); $instance = self::getInstance();
$key = self::generatePromiseKey(); $promiseKey = Promise::generatePromiseKey();
/* /*
* Fork the parent * Fork the parent
@ -61,7 +51,7 @@ class Asynchronous
if ($pid > 0) { if ($pid > 0) {
$instance->children[] = $pid; $instance->children[] = $pid;
return new Promise($key); return new Promise($promiseKey);
} }
/* /*
@ -75,17 +65,17 @@ class Asynchronous
* On failure, write a default response to the block in order for * On failure, write a default response to the block in order for
* the Promise to be able to resolve. * the Promise to be able to resolve.
*/ */
Runtime::markChild(); Runtime::markAsChild();
$instance->_attachToShm(); $instance->_attachToShm();
try { try {
$response = call_user_func($function, ...$parameters); $response = call_user_func($function, ...$parameters);
shm_put_var($instance->shm, $key, $response ?? Promise::RESPONSE_NONE); shm_put_var($instance->shm, $promiseKey, $response ?? Promise::RESPONSE_NONE);
exit(0); exit(0);
} catch (\Throwable $throwable) { } catch (\Throwable $throwable) {
shm_put_var($instance->shm, $key, Promise::RESPONSE_ERROR); shm_put_var($instance->shm, $promiseKey, Promise::RESPONSE_ERROR);
exit(1); exit(1);
} }
@ -144,14 +134,11 @@ class Asynchronous
private function __construct() private function __construct()
{ {
/* /*
* Use the filename as an identifier to create the * The reason we do this is for when the shm block
* System V IPC key. * already exists. We attach, remove, detach and reattach
* to ensure a clean state.
*/ */
if ($this->shmKey == null) $this->_attachToShm(); // Attach
$this->shmKey = ftok(__FILE__, 't');
Promise::__setShmKey($this->shmKey);
$this->_attachToShm();
} }
/** /**
@ -192,7 +179,7 @@ class Asynchronous
*/ */
private function _attachToShm() private function _attachToShm()
{ {
$this->shm = shm_attach($this->shmKey, self::BLOCK_SIZE_BYTES); $this->shm = shm_attach(Runtime::getSharedMemoryKey(), Runtime::getSharedMemorySize());
return $this; return $this;
} }
@ -217,29 +204,6 @@ class Asynchronous
} }
/**
* @return int
*/
private static function generatePromiseKey()
{
/*
* Get the current key.
*/
$promiseKey = self::$key;
/*
* Reset the key to 0 if the upper bound of
* 9.999.999 is reached (Windows limit for
* shm keys).
*/
self::$key++;
if (self::$key > 99999999)
self::$key = 0;
return $promiseKey;
}
/** /**
* *
*/ */

View File

@ -15,8 +15,7 @@ class Promise
public const RESPONSE_ERROR = '__PROMISE_RESPONSE_ERROR__'; public const RESPONSE_ERROR = '__PROMISE_RESPONSE_ERROR__';
/** @var int */ /** @var int */
private static $shmKey; private static $generatedKey = 0;
/** @var resource */ /** @var resource */
private $shm; private $shm;
@ -27,16 +26,27 @@ class Promise
/** @var mixed|null */ /** @var mixed|null */
private $value; private $value;
/** /**
* @param int $shmKey * @return int
*/ */
public static function __setShmKey(int $shmKey) public static function generatePromiseKey()
{ {
/* /*
* Should be done only once: when the Asynchronous class * Get the current key.
* has created a key that will be used for IPC.
*/ */
self::$shmKey = $shmKey; $promiseKey = self::$generatedKey;
/*
* Reset the key to 0 if the upper bound of
* 9.999.999 is reached (Windows limit for
* shm keys).
*/
self::$generatedKey++;
if (self::$generatedKey > 99999999)
self::$generatedKey = 0;
return $promiseKey;
} }
@ -48,19 +58,9 @@ class Promise
{ {
$this->key = $key; $this->key = $key;
$this->value = null; $this->value = null;
$this->shm = shm_attach(self::$shmKey); $this->shm = shm_attach(Runtime::getSharedMemoryKey());
} }
/**
* @return $this
*/
private function attempt()
{
if (shm_has_var($this->shm, $this->key))
$this->value = shm_get_var($this->shm, $this->key);
return $this;
}
/** /**
* @return bool * @return bool
@ -137,4 +137,15 @@ class Promise
shm_detach($this->shm); shm_detach($this->shm);
} }
/**
* @return $this
*/
private function attempt()
{
if (shm_has_var($this->shm, $this->key))
$this->value = shm_get_var($this->shm, $this->key);
return $this;
}
} }

View File

@ -13,16 +13,54 @@ namespace JoopSchilder\Asynchronous;
*/ */
class Runtime class Runtime
{ {
/** @var int */
public const INITIAL_SHM_SIZE_MB = 16;
/** @var int */
private static $sharedMemKey = null;
/** @var int */
private static $sharedMemSize = self::INITIAL_SHM_SIZE_MB * (1024 ** 2);
/** @var bool */ /** @var bool */
private static $inParentRuntime = true; private static $inParentRuntime = true;
/** /**
* * Runtime constructor.
*/ */
public static function markChild() private function __construct()
{ {
self::$inParentRuntime = false; }
/*
* Public
*/
/**
* @return int
*/
public static function getSharedMemorySize()
{
return self::$sharedMemSize;
}
/**
* @return int
*/
public static function getSharedMemoryKey()
{
/*
* Use the filename as an identifier to create the
* System V IPC key.
*/
if (is_null(self::$sharedMemKey))
self::$sharedMemKey = ftok(__FILE__, 't');
return self::$sharedMemKey;
} }
@ -34,4 +72,36 @@ class Runtime
return !self::$inParentRuntime; return !self::$inParentRuntime;
} }
/*
* 'Semi' private.
* To be used by internal classes.
*/
/**
* @param int $size_mb
*/
public static function _setSharedMemorySizeMB(int $size_mb)
{
self::$sharedMemSize = abs($size_mb) * (1024 ** 2);
}
/**
* @param int $size_b
*/
public static function _setSharedMemorySizeB(int $size_b)
{
self::$sharedMemSize = $size_b;
}
/**
*
*/
public static function markAsChild()
{
self::$inParentRuntime = false;
}
} }