Initial commit

This commit is contained in:
Joop Schilder 2020-12-28 23:32:37 +01:00
commit 66b18a024d
56 changed files with 3881 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/vendor/
/.idea/

7
README.md Normal file
View File

@ -0,0 +1,7 @@
# `toalett/redis-timeseries`
Bonjour!
`composer require toalett/redis-timeseries`

48
bin/example.php Normal file
View File

@ -0,0 +1,48 @@
<?php
use Predis\Client;
use Toalett\Redis\Timeseries\CommandBus;
use Toalett\Redis\Timeseries\Model\Aggregation;
use Toalett\Redis\Timeseries\Model\Filter;
use Toalett\Redis\Timeseries\Model\Value;
use Toalett\Redis\Timeseries\Protocol\Command as C;
use Toalett\Redis\Timeseries\Protocol\Query as Q;
use Toalett\Redis\Timeseries\QueryService;
use Toalett\Redis\Timeseries\Timeseries;
require_once __DIR__ . '/../vendor/autoload.php';
const SERIES = 'temperature:2:32';
const LABELS = ['sensor_id' => 2, 'area_id' => 32];
$client = new Client();
$commandBus = new CommandBus($client);
$queryService = new QueryService($client);
$commandBus->dispatch(C::delete(SERIES));
$commandBus->dispatch(C::create(SERIES)->labels(LABELS));
$timeseries = new Timeseries($commandBus, SERIES);
$timeseries->add(
new Value(time() - 5, 3.2),
new Value(time() - 4, 7.8),
new Value(time() - 3, 2.3),
new Value(time() - 2, 3.4),
new Value(time() - 1, 5.4),
new Value(time() - 0, 6.4)
);
print("\n\n==> mrange (filter on area_id=32, sensor_id=(2,3,4,5), COUNT 2)\n");
$query = Q::mrange();
$query->filter(Filter::where('area_id')->is(32));
$query->filter(Filter::where('sensor_id')->isOneOf(2, 3, 4, 5));
dump($queryService->mrange($query));
print("\n\n==> mrevrange (filter on non-existing label)\n");
$query = Q::mrevrange();
$query->filter(Filter::where('some_label')->is('some_value'));
dump($queryService->mrange($query));
print("\n\n==> range (aggregating var_p, timebucket=3)\n");
$query = Q::range(SERIES)->aggregate(Aggregation::populationVariance(3));
dump($queryService->range($query));

26
composer.json Normal file
View File

@ -0,0 +1,26 @@
{
"name": "toalett/redis-timeseries",
"type": "library",
"description": "Convenient interaction with Redis Timeseries",
"keywords": ["redis", "timeseries", "time series", "database"],
"homepage": "https://github.com/toalett/redis-timeseries",
"license": "MIT",
"authors": [
{
"name": "Joop Schilder",
"email": "jnmschilder@protonmail.com"
}
],
"autoload": {
"psr-4": {
"Toalett\\Redis\\Timeseries\\": "src"
}
},
"require": {
"predis/predis": "^1.1"
},
"require-dev": {
"symfony/var-dumper": "^5.2",
"phpunit/phpunit": "^9.5"
}
}

2436
composer.lock generated Normal file

File diff suppressed because it is too large Load Diff

25
src/CommandBus.php Normal file
View File

@ -0,0 +1,25 @@
<?php
namespace Toalett\Redis\Timeseries;
use Predis\Client;
use Toalett\Redis\Timeseries\Exception\DatabaseException;
use Toalett\Redis\Timeseries\Protocol\Command\Command;
final class CommandBus
{
private Client $client;
public function __construct(Client $client)
{
$this->client = $client;
}
public function dispatch(Command $command): void
{
$response = $this->client->executeRaw($command->toRawCommand());
if (is_string($response) && 0 !== strpos($response, 'OK')) {
throw new DatabaseException($response);
}
}
}

View File

@ -0,0 +1,17 @@
<?php
namespace Toalett\Redis\Timeseries\Exception;
use Throwable;
class DatabaseException extends RuntimeException
{
public function __construct($message = "", $code = 0, Throwable $previous = null)
{
$parts = explode(':', $message);
if (count($parts) >= 2) {
$message = ucfirst(trim(end($parts)));
}
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,9 @@
<?php
namespace Toalett\Redis\Timeseries\Exception;
use InvalidArgumentException as PhpInvalidArgumentException;
class InvalidArgumentException extends PhpInvalidArgumentException
{
}

View File

@ -0,0 +1,11 @@
<?php
namespace Toalett\Redis\Timeseries\Exception;
class KeyNotFoundException extends RuntimeException
{
public function __construct(string $key)
{
parent::__construct(sprintf("Key '{$key}' not found on Redis server"));
}
}

View File

@ -0,0 +1,9 @@
<?php
namespace Toalett\Redis\Timeseries\Exception;
use RuntimeException as PhpRuntimeException;
class RuntimeException extends PhpRuntimeException
{
}

80
src/Model/Aggregation.php Normal file
View File

@ -0,0 +1,80 @@
<?php
namespace Toalett\Redis\Timeseries\Model;
use Toalett\Redis\Timeseries\Exception\InvalidArgumentException;
final class Aggregation
{
public string $type;
public int $timeBucket;
private function __construct(string $type, int $timeBucket)
{
if ($timeBucket < 0) {
throw new InvalidArgumentException("Argument 'bucket' must be >= 0");
}
$this->type = $type;
$this->timeBucket = $timeBucket;
}
public static function average(int $timeBucket): self
{
return new Aggregation('avg', $timeBucket);
}
public static function sum(int $timeBucket): self
{
return new Aggregation('sum', $timeBucket);
}
public static function min(int $timeBucket): self
{
return new Aggregation('min', $timeBucket);
}
public static function max(int $timeBucket): self
{
return new Aggregation('max', $timeBucket);
}
public static function range(int $timeBucket): self
{
return new Aggregation('range', $timeBucket);
}
public static function count(int $timeBucket): self
{
return new Aggregation('count', $timeBucket);
}
public static function first(int $timeBucket): self
{
return new Aggregation('first', $timeBucket);
}
public static function last(int $timeBucket): self
{
return new Aggregation('last', $timeBucket);
}
public static function populationStdDev(int $timeBucket): self
{
return new Aggregation('std.p', $timeBucket);
}
public static function sampleStdDev(int $timeBucket): self
{
return new Aggregation('std.s', $timeBucket);
}
public static function populationVariance(int $timeBucket): self
{
return new Aggregation('var.p', $timeBucket);
}
public static function sampleVariance(int $timeBucket): self
{
return new Aggregation('var.s', $timeBucket);
}
}

View File

@ -0,0 +1,48 @@
<?php
namespace Toalett\Redis\Timeseries\Model;
final class DuplicatePolicy
{
private string $value;
private function __construct(string $value)
{
$this->value = $value;
}
public function toString(): string
{
return $this->value;
}
public static function block(): DuplicatePolicy
{
return new DuplicatePolicy('BLOCK');
}
public static function first(): DuplicatePolicy
{
return new DuplicatePolicy('FIRST');
}
public static function last(): DuplicatePolicy
{
return new DuplicatePolicy('LAST');
}
public static function min(): DuplicatePolicy
{
return new DuplicatePolicy('MIN');
}
public static function max(): DuplicatePolicy
{
return new DuplicatePolicy('MAX');
}
public static function sum(): DuplicatePolicy
{
return new DuplicatePolicy('SUM');
}
}

23
src/Model/Filter.php Normal file
View File

@ -0,0 +1,23 @@
<?php
namespace Toalett\Redis\Timeseries\Model;
final class Filter
{
private string $filter;
public function __construct(string $label, string $comparison, ?string $value = '')
{
$this->filter = "{$label}{$comparison}{$value}";
}
public static function where(string $label): Where
{
return new Where($label);
}
public function toString(): string
{
return $this->filter;
}
}

35
src/Model/Range.php Normal file
View File

@ -0,0 +1,35 @@
<?php
namespace Toalett\Redis\Timeseries\Model;
final class Range
{
private string $key;
private array $labels;
private array $values;
public function __construct(string $key, array $labels, array $values)
{
$this->key = $key;
$this->labels = $labels;
$this->values = $values;
}
public function getKey(): string
{
return $this->key;
}
public function getLabels(): array
{
return $this->labels;
}
/**
* @return Value[]
*/
public function getValues(): array
{
return $this->values;
}
}

48
src/Model/Retention.php Normal file
View File

@ -0,0 +1,48 @@
<?php
namespace Toalett\Redis\Timeseries\Model;
final class Retention
{
private int $value;
private function __construct(int $value)
{
$this->value = $value;
}
public function getValue(): int
{
return $this->value;
}
public static function indefinite(): Retention
{
return new self(0);
}
public static function milliseconds(int $milliseconds): Retention
{
return new self($milliseconds);
}
public static function seconds(int $seconds): Retention
{
return new self(1000 * $seconds);
}
public static function minutes(int $minutes): Retention
{
return new self(60 * 1000 * $minutes);
}
public static function hours(int $hours): Retention
{
return new self(60 * 60 * 1000 * $hours);
}
public static function days(int $days): Retention
{
return new self(24 * 60 * 60 * 1000 * $days);
}
}

40
src/Model/Value.php Normal file
View File

@ -0,0 +1,40 @@
<?php
namespace Toalett\Redis\Timeseries\Model;
final class Value
{
private ?string $timestamp;
private float $value;
public function __construct(?string $timestamp, float $value)
{
$this->timestamp = $timestamp;
$this->value = $value;
}
public static function recordNow(float $value): Value
{
return new self(time(), $value);
}
public static function auto(float $value): Value
{
return new self(null, $value);
}
public function getTimestamp(): string
{
return $this->timestamp ?? '*';
}
public function getValue(): float
{
return $this->value;
}
public function __toString(): string
{
return sprintf('(' . $this->timestamp . ':' . $this->value . ')');
}
}

43
src/Model/Where.php Normal file
View File

@ -0,0 +1,43 @@
<?php
namespace Toalett\Redis\Timeseries\Model;
final class Where
{
private ?string $label;
public function __construct(string $label)
{
$this->label = $label;
}
public function is(string $value): Filter
{
return new Filter($this->label, '=', $value);
}
public function isNot(string $value): Filter
{
return new Filter($this->label, '!=', $value);
}
public function isNotSet(): Filter
{
return new Filter($this->label, '=');
}
public function isSet(): Filter
{
return new Filter($this->label, '!=');
}
public function isOneOf(string ...$possibleValues): Filter
{
return new Filter($this->label, '=(', implode(',', $possibleValues) . ')');
}
public function isNoneOf(string ...$possibleValues): Filter
{
return new Filter($this->label, '!=(', implode(',', $possibleValues) . ')');
}
}

49
src/Protocol/Command.php Normal file
View File

@ -0,0 +1,49 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol;
use Toalett\Redis\Timeseries\Model\Aggregation;
use Toalett\Redis\Timeseries\Model\Value;
use Toalett\Redis\Timeseries\Protocol\Command\AddCommand;
use Toalett\Redis\Timeseries\Protocol\Command\AlterCommand;
use Toalett\Redis\Timeseries\Protocol\Command\CreateCommand;
use Toalett\Redis\Timeseries\Protocol\Command\CreateRuleCommand;
use Toalett\Redis\Timeseries\Protocol\Command\DeleteCommand;
use Toalett\Redis\Timeseries\Protocol\Command\DeleteRuleCommand;
final class Command
{
private function __construct()
{
}
public static function add(string $key, Value $value): AddCommand
{
return new AddCommand($key, $value);
}
public static function alter(string $key): AlterCommand
{
return new AlterCommand($key);
}
public static function create(string $key): CreateCommand
{
return new CreateCommand($key);
}
public static function createRule(string $sourceKey, string $destinationKey, Aggregation $aggregation): CreateRuleCommand
{
return new CreateRuleCommand($sourceKey, $destinationKey, $aggregation);
}
public static function delete(string $key): DeleteCommand
{
return new DeleteCommand($key);
}
public static function deleteRule(string $sourceKey, string $destinationKey): DeleteRuleCommand
{
return new DeleteRuleCommand($sourceKey, $destinationKey);
}
}

View File

@ -0,0 +1,26 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command;
use Toalett\Redis\Timeseries\Model\Value;
use Toalett\Redis\Timeseries\Protocol\Command\Components\CommandAssembler;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsKey;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsValue;
class AddCommand implements Command
{
use ContainsKey;
use ContainsValue;
use CommandAssembler;
public function __construct(string $key, Value $value)
{
$this->key = $key;
$this->value = $value;
}
public function toRawCommand(): array
{
return $this->assemble('TS.ADD');
}
}

View File

@ -0,0 +1,21 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command;
use Toalett\Redis\Timeseries\Protocol\Command\Components\CommandAssembler;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsKey;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsLabels;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsRetention;
class AlterCommand implements Command
{
use ContainsKey;
use ContainsRetention;
use ContainsLabels;
use CommandAssembler;
public function toRawCommand(): array
{
return $this->assemble('TS.ALTER');
}
}

View File

@ -0,0 +1,9 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command;
use Toalett\Redis\Timeseries\Protocol\ProtocolCommand;
interface Command extends ProtocolCommand
{
}

View File

@ -0,0 +1,26 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
use Toalett\Redis\Timeseries\Protocol\ProtocolAssembler;
trait CommandAssembler
{
use ProtocolAssembler;
final protected function assemble(string $commandName): array
{
return $this->appendAll(
[$commandName],
'appendKey',
'appendSourceAndDestinationKeys',
'appendValue',
'appendRetention',
'appendUncompressed',
'appendChunkSize',
'appendDuplicatePolicy',
'appendAggregation',
'appendLabels'
);
}
}

View File

@ -0,0 +1,27 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
use Toalett\Redis\Timeseries\Model\Aggregation;
trait ContainsAggregation
{
protected ?Aggregation $aggregation = null;
public function aggregate(Aggregation $aggregation): self
{
$this->aggregation = $aggregation;
return $this;
}
protected function appendAggregation(array $command): array
{
if (!is_null($this->aggregation)) {
$command[] = 'AGGREGATION';
$command[] = $this->aggregation->type;
$command[] = $this->aggregation->timeBucket;
}
return $command;
}
}

View File

@ -0,0 +1,23 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
trait ContainsChunkSize
{
protected ?int $chunkSize = null;
public function chunkSize(int $chunkSize): self
{
$this->chunkSize = $chunkSize;
return $this;
}
protected function appendChunkSize(array $command): array
{
if (!is_null($this->chunkSize)) {
$command[] = 'CHUNK_SIZE';
$command[] = $this->chunkSize;
}
return $command;
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
use Toalett\Redis\Timeseries\Model\DuplicatePolicy;
trait ContainsDuplicatePolicy
{
protected ?DuplicatePolicy $duplicatePolicy = null;
public function onDuplicate(DuplicatePolicy $duplicatePolicy): self
{
$this->duplicatePolicy = $duplicatePolicy;
return $this;
}
protected function appendDuplicatePolicy(array $command): array
{
if (!is_null($this->duplicatePolicy)) {
$command[] = 'DUPLICATE_POLICY';
$command[] = $this->duplicatePolicy->toString();
}
return $command;
}
}

View File

@ -0,0 +1,19 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
trait ContainsKey
{
protected string $key;
public function __construct(string $key)
{
$this->key = $key;
}
protected function appendKey(array $command): array
{
$command[] = $this->key;
return $command;
}
}

View File

@ -0,0 +1,32 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
trait ContainsLabels
{
private array $labels = [];
public function label(string $key, string $value): self
{
$this->labels[$key] = $value;
return $this;
}
public function labels(array $labels): self
{
$this->labels = $labels;
return $this;
}
protected function appendLabels(array $command): array
{
if (!empty($this->labels)) {
$command[] = 'LABELS';
foreach ($this->labels as $key => $value) {
$command[] = $key;
$command[] = $value;
}
}
return $command;
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
use Toalett\Redis\Timeseries\Model\Retention;
trait ContainsRetention
{
protected ?Retention $retention = null;
public function retain(Retention $retention): self
{
$this->retention = $retention;
return $this;
}
protected function appendRetention(array $command): array
{
if (!is_null($this->retention)) {
$command[] = 'RETENTION';
$command[] = $this->retention->getValue();
}
return $command;
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
trait ContainsSourceAndDestinationKeys
{
protected ?string $sourceKey = null;
protected ?string $destinationKey = null;
public function setEndpoints(string $sourceKey, string $destinationKey): self
{
$this->sourceKey = $sourceKey;
$this->destinationKey = $destinationKey;
return $this;
}
protected function appendSourceAndDestinationKeys(array $command): array
{
if (!is_null($this->sourceKey) && !is_null($this->destinationKey)) {
$command[] = $this->sourceKey;
$command[] = $this->destinationKey;
}
return $command;
}
}

View File

@ -0,0 +1,23 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
trait ContainsUncompressed
{
private bool $uncompressed = false;
public function uncompressed(): self
{
$this->uncompressed = true;
return $this;
}
protected function appendUncompressed(array $command): array
{
if (true === $this->uncompressed) {
$command[] = 'UNCOMPRESSED';
}
return $command;
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command\Components;
use Toalett\Redis\Timeseries\Model\Value;
trait ContainsValue
{
private ?Value $value = null;
public function value(Value $value): self
{
$this->value = $value;
return $this;
}
protected function appendValue(array $command): array
{
if (!is_null($this->value)) {
$command[] = $this->value->getTimestamp();
$command[] = $this->value->getValue();
}
return $command;
}
}

View File

@ -0,0 +1,27 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command;
use Toalett\Redis\Timeseries\Protocol\Command\Components\CommandAssembler;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsChunkSize;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsDuplicatePolicy;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsKey;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsLabels;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsRetention;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsUncompressed;
class CreateCommand implements Command
{
use ContainsKey;
use ContainsRetention;
use ContainsUncompressed;
use ContainsChunkSize;
use ContainsDuplicatePolicy;
use ContainsLabels;
use CommandAssembler;
public function toRawCommand(): array
{
return $this->assemble('TS.CREATE');
}
}

View File

@ -0,0 +1,26 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command;
use Toalett\Redis\Timeseries\Model\Aggregation;
use Toalett\Redis\Timeseries\Protocol\Command\Components\CommandAssembler;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsAggregation;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsSourceAndDestinationKeys;
class CreateRuleCommand implements Command
{
use ContainsSourceAndDestinationKeys;
use ContainsAggregation;
use CommandAssembler;
public function __construct(string $sourceKey, string $destinationKey, Aggregation $aggregation)
{
$this->setEndpoints($sourceKey, $destinationKey);
$this->aggregate($aggregation);
}
public function toRawCommand(): array
{
return $this->assemble('TS.CREATERULE');
}
}

View File

@ -0,0 +1,17 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command;
use Toalett\Redis\Timeseries\Protocol\Command\Components\CommandAssembler;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsKey;
class DeleteCommand implements Command
{
use ContainsKey;
use CommandAssembler;
public function toRawCommand(): array
{
return $this->assemble('DEL');
}
}

View File

@ -0,0 +1,22 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Command;
use Toalett\Redis\Timeseries\Protocol\Command\Components\CommandAssembler;
use Toalett\Redis\Timeseries\Protocol\Command\Components\ContainsSourceAndDestinationKeys;
class DeleteRuleCommand implements Command
{
use ContainsSourceAndDestinationKeys;
use CommandAssembler;
public function __construct(string $sourceKey, string $destinationKey)
{
$this->setEndpoints($sourceKey, $destinationKey);
}
public function toRawCommand(): array
{
return $this->assemble('TS.DELETERULE');
}
}

View File

@ -0,0 +1,18 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol;
trait ProtocolAssembler
{
protected function appendAll(array $command, string ...$methods): array
{
foreach ($methods as $method) {
if (!method_exists($this, $method)) {
continue;
}
$command = $this->{$method}($command);
}
return $command;
}
}

View File

@ -0,0 +1,8 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol;
interface ProtocolCommand
{
public function toRawCommand(): array;
}

48
src/Protocol/Query.php Normal file
View File

@ -0,0 +1,48 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol;
use Toalett\Redis\Timeseries\Model\Filter;
use Toalett\Redis\Timeseries\Protocol\Query\GetQuery;
use Toalett\Redis\Timeseries\Protocol\Query\MGetQuery;
use Toalett\Redis\Timeseries\Protocol\Query\MRangeQuery;
use Toalett\Redis\Timeseries\Protocol\Query\MRevRangeQuery;
use Toalett\Redis\Timeseries\Protocol\Query\RangeQuery;
use Toalett\Redis\Timeseries\Protocol\Query\RevRangeQuery;
final class Query
{
private function __construct()
{
}
public static function get(string $key): GetQuery
{
return new GetQuery($key);
}
public static function mget(Filter ...$filters): MGetQuery
{
return new MGetQuery(...$filters);
}
public static function mrange(Filter ...$filters): MRangeQuery
{
return new MRangeQuery(...$filters);
}
public static function mrevrange(Filter ...$filters): MRevRangeQuery
{
return new MRevRangeQuery(...$filters);
}
public static function range(string $key): RangeQuery
{
return new RangeQuery($key);
}
public static function revrange(string $key): RevRangeQuery
{
return new RevRangeQuery($key);
}
}

View File

@ -0,0 +1,20 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsAggregation;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsCount;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsFilters;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsRange;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsWithLabels;
use Toalett\Redis\Timeseries\Protocol\Query\Components\QueryAssembler;
abstract class AbstractMRangeQuery implements Query
{
use ContainsRange;
use ContainsCount;
use ContainsAggregation;
use ContainsWithLabels;
use ContainsFilters;
use QueryAssembler;
}

View File

@ -0,0 +1,18 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsAggregation;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsCount;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsKey;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsRange;
use Toalett\Redis\Timeseries\Protocol\Query\Components\QueryAssembler;
abstract class AbstractRangeQuery implements Query
{
use ContainsKey;
use ContainsRange;
use ContainsCount;
use ContainsAggregation;
use QueryAssembler;
}

View File

@ -0,0 +1,27 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query\Components;
use Toalett\Redis\Timeseries\Model\Aggregation;
trait ContainsAggregation
{
protected ?Aggregation $aggregation = null;
public function aggregate(Aggregation $aggregation): self
{
$this->aggregation = $aggregation;
return $this;
}
protected function appendAggregation(array $command): array
{
if (!is_null($this->aggregation)) {
$command[] = 'AGGREGATION';
$command[] = $this->aggregation->type;
$command[] = $this->aggregation->timeBucket;
}
return $command;
}
}

View File

@ -0,0 +1,24 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query\Components;
trait ContainsCount
{
protected ?int $count = null;
public function limitCount(int $count): self
{
$this->count = $count;
return $this;
}
protected function appendCount(array $command): array
{
if (!is_null($this->count)) {
$command[] = 'COUNT';
$command[] = $this->count;
}
return $command;
}
}

View File

@ -0,0 +1,41 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query\Components;
use Toalett\Redis\Timeseries\Exception\InvalidArgumentException;
use Toalett\Redis\Timeseries\Model\Filter;
trait ContainsFilters
{
/** @var Filter[] */
protected array $filters = [];
public function __construct(Filter...$filters)
{
$this->filters = $filters;
}
/**
* @param Filter ...$filters
* @return $this
*/
public function filter(Filter $filter): self
{
$this->filters[] = $filter;
return $this;
}
protected function appendFilters(array $command): array
{
if (empty($this->filters)) {
throw new InvalidArgumentException('At least one filter is required');
}
$command[] = 'FILTER';
foreach ($this->filters as $filter) {
$command[] = $filter->toString();
}
return $command;
}
}

View File

@ -0,0 +1,19 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query\Components;
trait ContainsKey
{
protected string $key;
public function __construct(string $key)
{
$this->key = $key;
}
protected function appendKey(array $command): array
{
$command[] = $this->key;
return $command;
}
}

View File

@ -0,0 +1,38 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query\Components;
use Toalett\Redis\Timeseries\Exception\InvalidArgumentException;
trait ContainsRange
{
protected string $from = '-';
protected string $to = '+';
/**
* @param int|string $fromTimestamp
* @param int|string $toTimestamp
* @return $this
*/
protected function range($fromTimestamp, $toTimestamp): self
{
if ($fromTimestamp < 0 && $fromTimestamp !== '-') {
throw new InvalidArgumentException("fromTimestamp must be '-' or positive integer");
}
if ($toTimestamp < 0 && $toTimestamp !== '+') {
throw new InvalidArgumentException("toTimestamp must be '+' or positive integer");
}
$this->from = $fromTimestamp;
$this->to = $toTimestamp;
return $this;
}
protected function appendRange(array $command): array
{
$command[] = $this->from;
$command[] = $this->to;
return $command;
}
}

View File

@ -0,0 +1,28 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query\Components;
trait ContainsWithLabels
{
protected bool $isWithLabels = true;
public function withLabels(): self
{
$this->isWithLabels = true;
return $this;
}
public function withoutLabels(): self
{
$this->isWithLabels = false;
return $this;
}
protected function appendWithLabels(array $command): array
{
if (true === $this->isWithLabels) {
$command[] = 'WITHLABELS';
}
return $command;
}
}

View File

@ -0,0 +1,23 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query\Components;
use Toalett\Redis\Timeseries\Protocol\ProtocolAssembler;
trait QueryAssembler
{
use ProtocolAssembler;
final protected function assemble(string $commandName): array
{
return $this->appendAll(
[$commandName],
'appendKey',
'appendRange',
'appendCount',
'appendAggregation',
'appendWithLabels',
'appendFilters'
);
}
}

View File

@ -0,0 +1,17 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsKey;
use Toalett\Redis\Timeseries\Protocol\Query\Components\QueryAssembler;
final class GetQuery implements Query
{
use ContainsKey;
use QueryAssembler;
public function toRawCommand(): array
{
return $this->assemble('TS.GET');
}
}

View File

@ -0,0 +1,19 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsFilters;
use Toalett\Redis\Timeseries\Protocol\Query\Components\ContainsWithLabels;
use Toalett\Redis\Timeseries\Protocol\Query\Components\QueryAssembler;
final class MGetQuery implements Query
{
use ContainsWithLabels;
use ContainsFilters;
use QueryAssembler;
public function toRawCommand(): array
{
return $this->assemble('TS.MGET');
}
}

View File

@ -0,0 +1,11 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
final class MRangeQuery extends AbstractMRangeQuery
{
public function toRawCommand(): array
{
return $this->assemble('TS.MRANGE');
}
}

View File

@ -0,0 +1,11 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
final class MRevRangeQuery extends AbstractMRangeQuery
{
public function toRawCommand(): array
{
return $this->assemble('TS.MREVRANGE');
}
}

View File

@ -0,0 +1,9 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
use Toalett\Redis\Timeseries\Protocol\ProtocolCommand;
interface Query extends ProtocolCommand
{
}

View File

@ -0,0 +1,11 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
final class RangeQuery extends AbstractRangeQuery
{
public function toRawCommand(): array
{
return $this->assemble('TS.RANGE');
}
}

View File

@ -0,0 +1,11 @@
<?php
namespace Toalett\Redis\Timeseries\Protocol\Query;
final class RevRangeQuery extends AbstractRangeQuery
{
public function toRawCommand(): array
{
return $this->assemble('TS.REVRANGE');
}
}

101
src/QueryService.php Normal file
View File

@ -0,0 +1,101 @@
<?php
namespace Toalett\Redis\Timeseries;
use Predis\Client;
use Toalett\Redis\Timeseries\Exception\DatabaseException;
use Toalett\Redis\Timeseries\Model\Range;
use Toalett\Redis\Timeseries\Model\Value;
use Toalett\Redis\Timeseries\Protocol\Query\AbstractMRangeQuery;
use Toalett\Redis\Timeseries\Protocol\Query\AbstractRangeQuery;
use Toalett\Redis\Timeseries\Protocol\Query\GetQuery;
use Toalett\Redis\Timeseries\Protocol\Query\MGetQuery;
use Toalett\Redis\Timeseries\Protocol\Query\Query;
final class QueryService
{
private Client $client;
public function __construct(Client $client)
{
$this->client = $client;
}
public function exists(string $key): bool
{
return 1 === $this->client->exists($key);
}
/**
* @param AbstractRangeQuery $rangeQuery
* @return Value[]
*/
public function range(AbstractRangeQuery $rangeQuery): array
{
$response = $this->send($rangeQuery);
return array_map([$this, 'extractValue'], $response);
}
/**
* @param AbstractMRangeQuery $mRangeQuery
* @return Range[]
*/
public function mrange(AbstractMRangeQuery $mRangeQuery): array
{
$response = $this->send($mRangeQuery);
return array_map([$this, 'extractRange'], $response);
}
public function get(GetQuery $query): ?Value
{
$response = $this->send($query);
return empty($response) ? null : $this->extractValue($response);
}
/**
* @param MGetQuery $query
* @return Range[]
*/
public function mget(MGetQuery $query): array
{
$response = $this->send($query);
return array_map([$this, 'extractRange'], $response);
}
private function extractValue(array $row): Value
{
return new Value((int)$row[0], (float)$row[1]->getPayload());
}
private function extractRange(array $row): Range
{
$labels = [];
foreach ($row[1] as $labelRow) {
$labels[$labelRow[0]] = $labelRow[1];
}
// Difference between mrange and range
if (empty($row[2])) {
$values = [];
} else if (is_object(@$row[2][1])) {
$values = [$this->extractValue($row[2])];
} else {
$values = array_map([$this, 'extractValue'], $row[2]);
}
return new Range($row[0], $labels, $values);
}
/**
* @param Query $command
* @return array|int
*/
protected function send(Query $command)
{
$response = $this->client->executeRaw($command->toRawCommand());
if (is_string($response) && 0 !== strpos($response, 'OK')) {
throw new DatabaseException($response);
}
return $response;
}
}

25
src/Timeseries.php Normal file
View File

@ -0,0 +1,25 @@
<?php
namespace Toalett\Redis\Timeseries;
use Toalett\Redis\Timeseries\Model\Value;
use Toalett\Redis\Timeseries\Protocol\Command as C;
final class Timeseries
{
private CommandBus $commandBus;
private string $key;
public function __construct(CommandBus $commandBus, string $key)
{
$this->commandBus = $commandBus;
$this->key = $key;
}
public function add(Value ...$values): void
{
foreach ($values as $value) {
$this->commandBus->dispatch(C::add($this->key, $value));
}
}
}