diff --git a/.gitignore b/.gitignore index 786143a..fbb5166 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,4 @@ /vendor/ /.idea/ - -/out/ -/documents/ - +/var/* +!/var/.gitignore diff --git a/command b/command new file mode 100755 index 0000000..e3f9876 --- /dev/null +++ b/command @@ -0,0 +1,13 @@ +#!/usr/bin/env php +connect('ipc:///tmp/storage_server_command.ipc'); +$socket->sendmulti($argv); diff --git a/composer.json b/composer.json deleted file mode 100644 index a3276cd..0000000 --- a/composer.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "name": "joopschilder/zmq-fileserver", - "authors": [ - { - "name": "Joop Schilder", - "email": "jnmschilder@protonmail.com" - } - ], - "require": { - "fzaninotto/faker": "^1.9" - } -} diff --git a/composer.lock b/composer.lock deleted file mode 100644 index 2a95c1a..0000000 --- a/composer.lock +++ /dev/null @@ -1,69 +0,0 @@ -{ - "_readme": [ - "This file locks the dependencies of your project to a known state", - "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", - "This file is @generated automatically" - ], - "content-hash": "541af5d4dea7e6b160f304dd7ba3073f", - "packages": [ - { - "name": "fzaninotto/faker", - "version": "v1.9.1", - "source": { - "type": "git", - "url": "https://github.com/fzaninotto/Faker.git", - "reference": "fc10d778e4b84d5bd315dad194661e091d307c6f" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/fzaninotto/Faker/zipball/fc10d778e4b84d5bd315dad194661e091d307c6f", - "reference": "fc10d778e4b84d5bd315dad194661e091d307c6f", - "shasum": "" - }, - "require": { - "php": "^5.3.3 || ^7.0" - }, - "require-dev": { - "ext-intl": "*", - "phpunit/phpunit": "^4.8.35 || ^5.7", - "squizlabs/php_codesniffer": "^2.9.2" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "1.9-dev" - } - }, - "autoload": { - "psr-4": { - "Faker\\": "src/Faker/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "François Zaninotto" - } - ], - "description": "Faker is a PHP library that generates fake data for you.", - "keywords": [ - "data", - "faker", - "fixtures" - ], - "time": "2019-12-12T13:22:17+00:00" - } - ], - "packages-dev": [], - "aliases": [], - "minimum-stability": "stable", - "stability-flags": [], - "prefer-stable": false, - "prefer-lowest": false, - "platform": [], - "platform-dev": [], - "plugin-api-version": "1.1.0" -} diff --git a/file_generator b/file_generator deleted file mode 100755 index 2c82428..0000000 --- a/file_generator +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env php -' . PHP_EOL); -require_once __DIR__ . '/vendor/autoload.php'; -$push = (new ZMQContext)->getSocket(ZMQ::SOCKET_PUSH)->connect('ipc:///tmp/storage_server_sink.ipc'); -$faker = Faker\Factory::create(); - -$id = 1; -while (true) { - $document = file_get_contents("https://twitter.com/{$faker->firstName}"); - $push->send($argv[1], ZMQ::MODE_SNDMORE) - ->send($id++ . '.html', ZMQ::MODE_SNDMORE) - ->send($document); - - sleep(2); -} diff --git a/query b/query new file mode 100755 index 0000000..6f25592 --- /dev/null +++ b/query @@ -0,0 +1,15 @@ +#!/usr/bin/env php +connect('ipc:///tmp/storage_server_query.ipc'); +$socket->sendmulti($argv); +print(implode(PHP_EOL, $socket->recvMulti())); +print(PHP_EOL); diff --git a/request_file b/request_file deleted file mode 100755 index ca4de11..0000000 --- a/request_file +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env php - ' . PHP_EOL); - -$req = (new ZMQContext)->getSocket(ZMQ::SOCKET_REQ) - ->connect('ipc:///tmp/storage_server_query.ipc') - ->send(@$argv[1], ZMQ::MODE_SNDMORE) - ->send(@$argv[2]); - -print($req->recv()); -print(PHP_EOL); diff --git a/save_file b/save_file deleted file mode 100755 index 219cec8..0000000 --- a/save_file +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env php -getSocket(ZMQ::SOCKET_PUSH)->connect('ipc:///tmp/storage_server_sink.ipc') - ->send(@$argv[1], ZMQ::MODE_SNDMORE) - ->send(@$argv[2], ZMQ::MODE_SNDMORE) - ->send(@$argv[3]); diff --git a/server b/server new file mode 100755 index 0000000..a59d934 --- /dev/null +++ b/server @@ -0,0 +1,10 @@ +#!/usr/bin/env php +run(); + diff --git a/src/FileServer.php b/src/FileServer.php new file mode 100644 index 0000000..d1b53cf --- /dev/null +++ b/src/FileServer.php @@ -0,0 +1,157 @@ +initializeRootDirectory($rootDirectory); + $this->initializeCommandSocket($context, $commandDsns); + $this->initializeQuerySocket($context, $queryDsns); + $this->initializePoll(); + } + + private function initializeRootDirectory(string $rootDirectory): void + { + $this->rootDirectory = rtrim($rootDirectory, DIRECTORY_SEPARATOR); + if (!is_dir($this->rootDirectory)) { + mkdir($this->rootDirectory, 0777, true); + } + } + + private function initializeCommandSocket(ZMQContext $context, array $bindings): void + { + if (count($bindings) === 0) { + throw new InvalidArgumentException("At least one binding required"); + } + $this->commandSocket = new ZMQSocket($context, ZMQ::SOCKET_PULL); + $this->commandSocket->setSockOpt(ZMQ::SOCKOPT_HWM, 5); + foreach ($bindings as $dsn) { + $this->commandSocket->bind($dsn); + } + } + + private function initializeQuerySocket(ZMQContext $context, array $bindings): void + { + if (count($bindings) === 0) { + throw new InvalidArgumentException("At least one binding required"); + } + $this->querySocket = new ZMQSocket($context, ZMQ::SOCKET_REP); + $this->querySocket->setSockOpt(ZMQ::SOCKOPT_HWM, 5); + foreach ($bindings as $dsn) { + $this->querySocket->bind($dsn); + } + } + + private function initializePoll(): void + { + $this->poll = new ZMQPoll(); + $this->poll->add($this->commandSocket, ZMQ::POLL_IN); + $this->poll->add($this->querySocket, ZMQ::POLL_IN); + } + + public function run() + { + $readable = $writable = []; + while (true) { + $this->poll->poll($readable, $writable); + foreach ($readable as $socket) { + $socket === $this->querySocket and $this->onQuery(); + $socket === $this->commandSocket and $this->onCommand(); + } + } + } + + private function onQuery(): void + { + $arguments = $this->querySocket->recvMulti(); + $query = array_shift($arguments); + switch ($query) { + case 'LOAD': + if (count($arguments) !== 2) { + $this->querySocket->send(-1); + print("Malformed LOAD query\n"); + return; + } + $this->querySocket->send(@file_get_contents("{$this->rootDirectory}/{$arguments[0]}/{$arguments[1]}") ?: -1); + break; + case 'CONTAINS': + if (count($arguments) !== 2) { + $this->querySocket->send(-1); + print("Malformed CONTAINS query\n"); + return; + } + $this->querySocket->send(file_exists("{$this->rootDirectory}/{$arguments[0]}/{$arguments[1]}") ? 'Y' : 'N'); + break; + default: + $this->querySocket->send(-1); + $this->onUnknownInput($query, $arguments); + break; + } + } + + private function onCommand(): void + { + $arguments = $this->commandSocket->recvMulti(); + $command = array_shift($arguments); + switch ($command) { + case 'SAVE': + if (count($arguments) !== 3) { + print("Malformed SAVE command\n"); + return; + } + $this->saveFile($arguments[0], $arguments[1], $arguments[2]); + break; + case 'DELETE': + if (count($arguments) !== 2) { + print("Malformed DELETE command\n"); + return; + } + $this->deleteFile($arguments[0], $arguments[1]); + break; + case 'DELETE_ALL': + if (count($arguments) !== 1) { + print("Malformed DELETE_ALL command\n"); + } + $this->deleteAll($arguments[0]); + break; + default: + $this->onUnknownInput($command, $arguments); + break; + } + } + + private function onUnknownInput(string $input, array $arguments): void + { + $arguments = array_map(fn($v) => substr($v, 0, 12), $arguments); + printf("Unknown input '%s' with args (%s)\n", $input, implode(', ', $arguments)); + } + + private function saveFile(string &$namespace, string &$name, string &$content): void + { + is_dir($targetDir = "{$this->rootDirectory}/$namespace") or mkdir($targetDir, 0777, true); + file_put_contents("$targetDir/$name", $content); + } + + private function deleteFile(string &$namespace, string &$name): void + { + @unlink("{$this->rootDirectory}/$namespace/$name"); + } + + private function deleteAll(string &$namespace): void + { + $directory = "{$this->rootDirectory}/$namespace"; + if (!is_dir($directory)) { + return; + } + static $source = '/tmp/zmq.fileserver.rsync_empty_dir.d'; + is_dir($source) or @mkdir($source); + @system("nohup rsync --archive --recursive --delete {$source}/ {$directory}/ 2>&1 >/dev/null &"); + @rmdir($directory); + } +} diff --git a/storage_server b/storage_server deleted file mode 100755 index 68301e0..0000000 --- a/storage_server +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env php -getSocket(ZMQ::SOCKET_PULL)->bind('ipc:///tmp/storage_server_sink.ipc'); -$replySocket = $zmq->getSocket(ZMQ::SOCKET_REP)->bind('ipc:///tmp/storage_server_query.ipc'); -$poll = new ZMQPoll(); -$poll->add($pullSocket, ZMQ::POLL_IN); -$poll->add($replySocket, ZMQ::POLL_IN); -$readable = $writable = []; -while (true) { - $poll->poll($readable, $writable); - foreach ($readable as $socket) { - $socket === $pullSocket and handleIncomingFile(); - $socket === $replySocket and handleIncomingRequest(); - } -} - -function handleIncomingFile(): void -{ - global $pullSocket, $rootDir; - $namespace = $pullSocket->recv(ZMQ::MODE_SNDMORE); - $name = $pullSocket->recv(ZMQ::MODE_SNDMORE); - $contents = $pullSocket->recv(); - is_dir($targetDir = "$rootDir/$namespace") or @mkdir($targetDir, 0777, true); - file_put_contents("$targetDir/$name", $contents) and print('S'); -} - -function handleIncomingRequest(): void -{ - global $replySocket, $rootDir; - $namespace = $replySocket->recv(ZMQ::MODE_SNDMORE); - $name = $replySocket->recv(); - $replySocket->send(@file_get_contents("$rootDir/{$namespace}/{$name}") ?: 'NOT_FOUND') and print('R'); -} diff --git a/var/.gitignore b/var/.gitignore new file mode 100644 index 0000000..e69de29