Add command, query, stable server with argument parsing

This commit is contained in:
Joop Schilder 2020-06-20 18:07:43 +02:00
parent 3b8438f9e7
commit ae7f92d51d
12 changed files with 197 additions and 159 deletions

6
.gitignore vendored
View File

@ -1,6 +1,4 @@
/vendor/ /vendor/
/.idea/ /.idea/
/var/*
/out/ !/var/.gitignore
/documents/

13
command Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env php
<?php
array_shift($argv);
if (count($argv) === 0) {
print('Command needs at least one argument' . PHP_EOL);
die(1);
}
$context = new ZMQContext();
$socket = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$socket->connect('ipc:///tmp/storage_server_command.ipc');
$socket->sendmulti($argv);

View File

@ -1,12 +0,0 @@
{
"name": "joopschilder/zmq-fileserver",
"authors": [
{
"name": "Joop Schilder",
"email": "jnmschilder@protonmail.com"
}
],
"require": {
"fzaninotto/faker": "^1.9"
}
}

69
composer.lock generated
View File

@ -1,69 +0,0 @@
{
"_readme": [
"This file locks the dependencies of your project to a known state",
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "541af5d4dea7e6b160f304dd7ba3073f",
"packages": [
{
"name": "fzaninotto/faker",
"version": "v1.9.1",
"source": {
"type": "git",
"url": "https://github.com/fzaninotto/Faker.git",
"reference": "fc10d778e4b84d5bd315dad194661e091d307c6f"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/fzaninotto/Faker/zipball/fc10d778e4b84d5bd315dad194661e091d307c6f",
"reference": "fc10d778e4b84d5bd315dad194661e091d307c6f",
"shasum": ""
},
"require": {
"php": "^5.3.3 || ^7.0"
},
"require-dev": {
"ext-intl": "*",
"phpunit/phpunit": "^4.8.35 || ^5.7",
"squizlabs/php_codesniffer": "^2.9.2"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "1.9-dev"
}
},
"autoload": {
"psr-4": {
"Faker\\": "src/Faker/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "François Zaninotto"
}
],
"description": "Faker is a PHP library that generates fake data for you.",
"keywords": [
"data",
"faker",
"fixtures"
],
"time": "2019-12-12T13:22:17+00:00"
}
],
"packages-dev": [],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": [],
"platform-dev": [],
"plugin-api-version": "1.1.0"
}

View File

@ -1,16 +0,0 @@
#!/usr/bin/env php
<?php
$argc < 2 and die('Usage: file_generator <namespace>' . PHP_EOL);
require_once __DIR__ . '/vendor/autoload.php';
$push = (new ZMQContext)->getSocket(ZMQ::SOCKET_PUSH)->connect('ipc:///tmp/storage_server_sink.ipc');
$faker = Faker\Factory::create();
$id = 1;
while (true) {
$document = file_get_contents("https://twitter.com/{$faker->firstName}");
$push->send($argv[1], ZMQ::MODE_SNDMORE)
->send($id++ . '.html', ZMQ::MODE_SNDMORE)
->send($document);
sleep(2);
}

15
query Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env php
<?php
array_shift($argv);
if (count($argv) === 0) {
print('Query needs at least one argument' . PHP_EOL);
die(1);
}
$context = new ZMQContext();
$socket = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$socket->connect('ipc:///tmp/storage_server_query.ipc');
$socket->sendmulti($argv);
print(implode(PHP_EOL, $socket->recvMulti()));
print(PHP_EOL);

View File

@ -1,12 +0,0 @@
#!/usr/bin/env php
<?php
$argc < 3 and die('Usage: request_file <namespace> <name>' . PHP_EOL);
$req = (new ZMQContext)->getSocket(ZMQ::SOCKET_REQ)
->connect('ipc:///tmp/storage_server_query.ipc')
->send(@$argv[1], ZMQ::MODE_SNDMORE)
->send(@$argv[2]);
print($req->recv());
print(PHP_EOL);

View File

@ -1,10 +0,0 @@
#!/usr/bin/env php
<?php
$argc < 4 and die('Usage: save_file NAMESPACE NAME CONTENTS' . PHP_EOL);
(new ZMQContext)
->getSocket(ZMQ::SOCKET_PUSH)->connect('ipc:///tmp/storage_server_sink.ipc')
->send(@$argv[1], ZMQ::MODE_SNDMORE)
->send(@$argv[2], ZMQ::MODE_SNDMORE)
->send(@$argv[3]);

10
server Executable file
View File

@ -0,0 +1,10 @@
#!/usr/bin/env php
<?php
require_once __DIR__ . '/src/FileServer.php';
$dir = ($argv[1] ?? __DIR__ . '/var/documents');
$commandEndpoints = ['ipc:///tmp/storage_server_command.ipc'];
$queryEndpoints = ['ipc:///tmp/storage_server_query.ipc'];
(new FileServer($dir, new ZMQContext(), $commandEndpoints, $queryEndpoints))->run();

157
src/FileServer.php Normal file
View File

@ -0,0 +1,157 @@
<?php
class FileServer
{
private ZMQSocket $commandSocket;
private ZMQSocket $querySocket;
private ZMQPoll $poll;
private string $rootDirectory;
public function __construct(string $rootDirectory, ZMQContext $context, array $commandDsns, array $queryDsns)
{
$this->initializeRootDirectory($rootDirectory);
$this->initializeCommandSocket($context, $commandDsns);
$this->initializeQuerySocket($context, $queryDsns);
$this->initializePoll();
}
private function initializeRootDirectory(string $rootDirectory): void
{
$this->rootDirectory = rtrim($rootDirectory, DIRECTORY_SEPARATOR);
if (!is_dir($this->rootDirectory)) {
mkdir($this->rootDirectory, 0777, true);
}
}
private function initializeCommandSocket(ZMQContext $context, array $bindings): void
{
if (count($bindings) === 0) {
throw new InvalidArgumentException("At least one binding required");
}
$this->commandSocket = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$this->commandSocket->setSockOpt(ZMQ::SOCKOPT_HWM, 5);
foreach ($bindings as $dsn) {
$this->commandSocket->bind($dsn);
}
}
private function initializeQuerySocket(ZMQContext $context, array $bindings): void
{
if (count($bindings) === 0) {
throw new InvalidArgumentException("At least one binding required");
}
$this->querySocket = new ZMQSocket($context, ZMQ::SOCKET_REP);
$this->querySocket->setSockOpt(ZMQ::SOCKOPT_HWM, 5);
foreach ($bindings as $dsn) {
$this->querySocket->bind($dsn);
}
}
private function initializePoll(): void
{
$this->poll = new ZMQPoll();
$this->poll->add($this->commandSocket, ZMQ::POLL_IN);
$this->poll->add($this->querySocket, ZMQ::POLL_IN);
}
public function run()
{
$readable = $writable = [];
while (true) {
$this->poll->poll($readable, $writable);
foreach ($readable as $socket) {
$socket === $this->querySocket and $this->onQuery();
$socket === $this->commandSocket and $this->onCommand();
}
}
}
private function onQuery(): void
{
$arguments = $this->querySocket->recvMulti();
$query = array_shift($arguments);
switch ($query) {
case 'LOAD':
if (count($arguments) !== 2) {
$this->querySocket->send(-1);
print("Malformed LOAD query\n");
return;
}
$this->querySocket->send(@file_get_contents("{$this->rootDirectory}/{$arguments[0]}/{$arguments[1]}") ?: -1);
break;
case 'CONTAINS':
if (count($arguments) !== 2) {
$this->querySocket->send(-1);
print("Malformed CONTAINS query\n");
return;
}
$this->querySocket->send(file_exists("{$this->rootDirectory}/{$arguments[0]}/{$arguments[1]}") ? 'Y' : 'N');
break;
default:
$this->querySocket->send(-1);
$this->onUnknownInput($query, $arguments);
break;
}
}
private function onCommand(): void
{
$arguments = $this->commandSocket->recvMulti();
$command = array_shift($arguments);
switch ($command) {
case 'SAVE':
if (count($arguments) !== 3) {
print("Malformed SAVE command\n");
return;
}
$this->saveFile($arguments[0], $arguments[1], $arguments[2]);
break;
case 'DELETE':
if (count($arguments) !== 2) {
print("Malformed DELETE command\n");
return;
}
$this->deleteFile($arguments[0], $arguments[1]);
break;
case 'DELETE_ALL':
if (count($arguments) !== 1) {
print("Malformed DELETE_ALL command\n");
}
$this->deleteAll($arguments[0]);
break;
default:
$this->onUnknownInput($command, $arguments);
break;
}
}
private function onUnknownInput(string $input, array $arguments): void
{
$arguments = array_map(fn($v) => substr($v, 0, 12), $arguments);
printf("Unknown input '%s' with args (%s)\n", $input, implode(', ', $arguments));
}
private function saveFile(string &$namespace, string &$name, string &$content): void
{
is_dir($targetDir = "{$this->rootDirectory}/$namespace") or mkdir($targetDir, 0777, true);
file_put_contents("$targetDir/$name", $content);
}
private function deleteFile(string &$namespace, string &$name): void
{
@unlink("{$this->rootDirectory}/$namespace/$name");
}
private function deleteAll(string &$namespace): void
{
$directory = "{$this->rootDirectory}/$namespace";
if (!is_dir($directory)) {
return;
}
static $source = '/tmp/zmq.fileserver.rsync_empty_dir.d';
is_dir($source) or @mkdir($source);
@system("nohup rsync --archive --recursive --delete {$source}/ {$directory}/ 2>&1 >/dev/null &");
@rmdir($directory);
}
}

View File

@ -1,36 +0,0 @@
#!/usr/bin/env php
<?php
is_dir($rootDir = rtrim($argv[1] ?? __DIR__ . '/documents', '/')) or mkdir($rootDir, 0777, true);
$zmq = new ZMQContext();
$pullSocket = $zmq->getSocket(ZMQ::SOCKET_PULL)->bind('ipc:///tmp/storage_server_sink.ipc');
$replySocket = $zmq->getSocket(ZMQ::SOCKET_REP)->bind('ipc:///tmp/storage_server_query.ipc');
$poll = new ZMQPoll();
$poll->add($pullSocket, ZMQ::POLL_IN);
$poll->add($replySocket, ZMQ::POLL_IN);
$readable = $writable = [];
while (true) {
$poll->poll($readable, $writable);
foreach ($readable as $socket) {
$socket === $pullSocket and handleIncomingFile();
$socket === $replySocket and handleIncomingRequest();
}
}
function handleIncomingFile(): void
{
global $pullSocket, $rootDir;
$namespace = $pullSocket->recv(ZMQ::MODE_SNDMORE);
$name = $pullSocket->recv(ZMQ::MODE_SNDMORE);
$contents = $pullSocket->recv();
is_dir($targetDir = "$rootDir/$namespace") or @mkdir($targetDir, 0777, true);
file_put_contents("$targetDir/$name", $contents) and print('S');
}
function handleIncomingRequest(): void
{
global $replySocket, $rootDir;
$namespace = $replySocket->recv(ZMQ::MODE_SNDMORE);
$name = $replySocket->recv();
$replySocket->send(@file_get_contents("$rootDir/{$namespace}/{$name}") ?: 'NOT_FOUND') and print('R');
}

0
var/.gitignore vendored Normal file
View File