toalett-redis-timeseries/src/QueryService.php

102 lines
2.7 KiB
PHP

<?php
namespace Toalett\Redis\Timeseries;
use Predis\Client;
use Toalett\Redis\Timeseries\Exception\DatabaseException;
use Toalett\Redis\Timeseries\Model\Range;
use Toalett\Redis\Timeseries\Model\Value;
use Toalett\Redis\Timeseries\Protocol\Query\AbstractMRangeQuery;
use Toalett\Redis\Timeseries\Protocol\Query\AbstractRangeQuery;
use Toalett\Redis\Timeseries\Protocol\Query\GetQuery;
use Toalett\Redis\Timeseries\Protocol\Query\MGetQuery;
use Toalett\Redis\Timeseries\Protocol\Query\Query;
final class QueryService
{
private Client $client;
public function __construct(Client $client)
{
$this->client = $client;
}
public function exists(string $key): bool
{
return 1 === $this->client->exists($key);
}
/**
* @param AbstractRangeQuery $rangeQuery
* @return Value[]
*/
public function range(AbstractRangeQuery $rangeQuery): array
{
$response = $this->send($rangeQuery);
return array_map([$this, 'extractValue'], $response);
}
/**
* @param AbstractMRangeQuery $mRangeQuery
* @return Range[]
*/
public function mrange(AbstractMRangeQuery $mRangeQuery): array
{
$response = $this->send($mRangeQuery);
return array_map([$this, 'extractRange'], $response);
}
public function get(GetQuery $query): ?Value
{
$response = $this->send($query);
return empty($response) ? null : $this->extractValue($response);
}
/**
* @param MGetQuery $query
* @return Range[]
*/
public function mget(MGetQuery $query): array
{
$response = $this->send($query);
return array_map([$this, 'extractRange'], $response);
}
private function extractValue(array $row): Value
{
return new Value((int)$row[0], (float)$row[1]->getPayload());
}
private function extractRange(array $row): Range
{
$labels = [];
foreach ($row[1] as $labelRow) {
$labels[$labelRow[0]] = $labelRow[1];
}
// Difference between mrange and range
if (empty($row[2])) {
$values = [];
} else if (is_object(@$row[2][1])) {
$values = [$this->extractValue($row[2])];
} else {
$values = array_map([$this, 'extractValue'], $row[2]);
}
return new Range($row[0], $labels, $values);
}
/**
* @param Query $command
* @return array|int
*/
protected function send(Query $command)
{
$response = $this->client->executeRaw($command->toRawCommand());
if (is_string($response) && 0 !== strpos($response, 'OK')) {
throw new DatabaseException($response);
}
return $response;
}
}