Skip to content

Commit

Permalink
Adding the ability to use tags servers (#15)
Browse files Browse the repository at this point in the history
* adding ability to use proxy servers

* added tests for changes

* code style fixes

* trying to fix building

* another try

* fix dependencies and tests

* change readme

* fix readme

* grammar fix

* remove proxy servers and add tags for servers

* code style fixes

* code style fix

* add missing tests

* fix test

* add missing tests

* fix method name in readme

* some refactoring

* refactor variables name with random server hostname
  • Loading branch information
romanpravda authored Sep 22, 2020
1 parent c762dbe commit 46ccafe
Show file tree
Hide file tree
Showing 22 changed files with 396 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ before_script:
- make && make install
- cd ../../

script: phpunit --coverage-clover ./tests/logs/clover.xml
script: vendor/bin/phpunit --coverage-clover ./tests/logs/clover.xml

after_script:
- php vendor/bin/php-coveralls -v
Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,29 @@ By default client will use random server in given list of servers or in specifie
$client->using('server-2')->select('select * from table');
```

## Server tags

```php
$firstServerOptionsWithTag = (new \Tinderbox\Clickhouse\Common\ServerOptions())->setTag('tag');
$secondServerOptionsWithAnotherTag = (new \Tinderbox\Clickhouse\Common\ServerOptions())->setTag('another-tag');

$server = new Tinderbox\Clickhouse\Server('127.0.0.1', '8123', 'default', 'user', 'pass', $firstServerOptionsWithTag);

$cluster = new Tinderbox\Clickhouse\Cluster('cluster', [
new Tinderbox\Clickhouse\Server('127.0.0.2', '8123', 'default', 'user', 'pass', $secondServerOptionsWithAnotherTag)
]);

$serverProvider = (new Tinderbox\Clickhouse\ServerProvider())->addServer($server)->addCluster($cluster);

$client = (new Tinderbox\Clickhouse\Client($serverProvider));
```

To use server with tag, you should call ```usingServerWithTag``` function before execute any query.

```php
$client->usingServerWithTag('tag');
```

## Select queries

Any SELECT query will return instance of `Result`. This class implements interfaces `\ArrayAccess`, `\Countable` и `\Iterator`,
Expand Down
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
},
"require": {
"php": "~7.1",
"guzzlehttp/guzzle": "~6.0"
"guzzlehttp/guzzle": "~6.0",
"satooshi/php-coveralls": "^2.2"
},
"bin": [
"bin/ccat_linux",
"bin/ccat_darwin"
],
"require-dev": {
"mockery/mockery": "^0.9.9",
"phpunit/php-code-coverage": "^5.2",
"phpunit/phpunit": "~4.0||~5.0||~6.0"
"phpunit/php-code-coverage": "^6.0",
"phpunit/phpunit": "~7.0"
},
"scripts": {
"test": "phpunit --coverage-text --colors=never"
Expand Down
20 changes: 20 additions & 0 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,26 @@ public function usingRandomServer()
return $this;
}

/**
* Client will use server with tag as server for queries.
*
* @var string
*
* @return $this
*/
public function usingServerWithTag(string $tag)
{
$this->serverHostname = function () use ($tag) {
if ($this->isOnCluster()) {
return $this->serverProvider->getRandomServerFromClusterByTag($this->getClusterName(), $tag);
} else {
return $this->serverProvider->getRandomServerWithTag($tag);
}
};

return $this;
}

/**
* Returns true if cluster selected.
*
Expand Down
33 changes: 32 additions & 1 deletion src/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ class Cluster
*/
protected $servers = [];

/**
* Servers in cluster by tags.
*
* @var \Tinderbox\Clickhouse\Server[][]
*/
protected $serversByTags = [];

/**
* Cluster constructor.
*
Expand Down Expand Up @@ -86,6 +93,12 @@ public function addServer(string $hostname, Server $server)
}

$this->servers[$hostname] = $server;

$serverTags = $server->getOptions()->getTags();

foreach ($serverTags as $serverTag) {
$this->serversByTags[$serverTag][$hostname] = true;
}
}

/**
Expand All @@ -98,6 +111,24 @@ public function getServers(): array
return $this->servers;
}

/**
* Returns servers in cluster by tag.
*
* @param string $tag
*
* @throws ClusterException
*
* @return \Tinderbox\Clickhouse\Server[]
*/
public function getServersByTag(string $tag): array
{
if (!isset($this->serversByTags[$tag])) {
throw ClusterException::tagNotFound($tag);
}

return $this->serversByTags[$tag];
}

/**
* Returns server by specified hostname.
*
Expand All @@ -121,7 +152,7 @@ public function getServerByHostname(string $hostname): Server
*
* @return string
*/
public function getName() : string
public function getName(): string
{
return $this->name;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Common/File.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class File extends AbstractFile implements FileInterface
{
public function open(bool $gzip = true) : StreamInterface
public function open(bool $gzip = true): StreamInterface
{
$handle = fopen($this->getSource(), 'r');

Expand Down
2 changes: 1 addition & 1 deletion src/Common/FileFromString.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class FileFromString extends AbstractFile implements FileInterface
{
public function open(bool $gzip = true) : StreamInterface
public function open(bool $gzip = true): StreamInterface
{
$handle = fopen('php://memory', 'r+');
fwrite($handle, $this->source);
Expand Down
2 changes: 1 addition & 1 deletion src/Common/MergedFiles.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ protected function getCcatPath(): string
return __DIR__.DIRECTORY_SEPARATOR.'..'.DIRECTORY_SEPARATOR.'..'.DIRECTORY_SEPARATOR.'bin'.DIRECTORY_SEPARATOR.'ccat_'.strtolower(PHP_OS);
}

public function open(bool $gzip = true) : StreamInterface
public function open(bool $gzip = true): StreamInterface
{
$descriptorspec = [
0 => ['pipe', 'r'],
Expand Down
49 changes: 49 additions & 0 deletions src/Common/ServerOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ class ServerOptions
*/
protected $protocol = 'http';

/**
* Tags.
*
* @var array
*/
protected $tags = [];

/**
* Sets protocol.
*
Expand All @@ -37,4 +44,46 @@ public function getProtocol(): string
{
return $this->protocol;
}

/**
* Set tags.
*
* @param array $tags
*
* @return ServerOptions
*/
public function setTags(array $tags): self
{
$this->tags = [];

foreach ($tags as $tag) {
$this->addTag($tag);
}

return $this;
}

/**
* Adds tag.
*
* @param string $tag
*
* @return ServerOptions
*/
public function addTag(string $tag): self
{
$this->tags[$tag] = true;

return $this;
}

/**
* Returns tags.
*
* @return array
*/
public function getTags(): array
{
return array_keys($this->tags);
}
}
2 changes: 1 addition & 1 deletion src/Common/TempTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public function getFormat(): string
return $this->format;
}

public function open(bool $gzip = true) : StreamInterface
public function open(bool $gzip = true): StreamInterface
{
return $this->source->open($gzip);
}
Expand Down
10 changes: 10 additions & 0 deletions src/Exceptions/ClusterException.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,14 @@ public static function serverNotFound($hostname)
{
return new static('Server with hostname ['.$hostname.'] is not found in cluster');
}

public static function tagNotFound($tag)
{
return new static('There are no servers with tag ['.$tag.'] in cluster');
}

public static function serverNotFoundByTag($tag, $hostname)
{
return new static('Server with hostname ['.$hostname.'] and tag ['.$tag.'] is not found in cluster');
}
}
10 changes: 10 additions & 0 deletions src/Exceptions/ServerProviderException.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,14 @@ public static function serverHostnameNotFound($hostname)
{
return new static('Can not find server with hostname ['.$hostname.']');
}

public static function serverTagNotFound($tag)
{
return new static('Can not find servers with tag ['.$tag.']');
}

public static function serverHostnameNotFoundForTag($tag, $hostname)
{
return new static('Can not find servers with hostname ['.$hostname.'] and tag ['.$tag.']');
}
}
2 changes: 1 addition & 1 deletion src/Interfaces/FileInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

interface FileInterface
{
public function open(bool $gzip = true) : StreamInterface;
public function open(bool $gzip = true): StreamInterface;
}
2 changes: 1 addition & 1 deletion src/Query/QueryStatistic.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* @property int $rows
* @property int $bytes
* @property float $time
* @property int $rowsBeforeLimitAtLeast
* @property int $rowsBeforeLimitAtLeast
*/
class QueryStatistic
{
Expand Down
6 changes: 3 additions & 3 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class Server
public function __construct(
string $host,
string $port = '8123',
string $database = 'default',
string $username = null,
string $password = null,
?string $database = 'default',
?string $username = null,
?string $password = null,
ServerOptions $options = null
) {
$this->setHost($host);
Expand Down
37 changes: 34 additions & 3 deletions src/ServerProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ class ServerProvider
*/
protected $servers = [];

/**
* Servers by tags.
*
* @var Server[][]
*/
protected $serversByTags = [];

/**
* Current server to perform queries.
*
Expand Down Expand Up @@ -60,6 +67,12 @@ public function addServer(Server $server)

$this->servers[$serverHostname] = $server;

$serverTags = $server->getOptions()->getTags();

foreach ($serverTags as $serverTag) {
$this->serversByTags[$serverTag][$serverHostname] = true;
}

return $this;
}

Expand All @@ -68,12 +81,30 @@ public function getRandomServer(): Server
return $this->getServer(array_rand($this->servers, 1));
}

public function getRandomServerWithTag(string $tag): Server
{
if (!isset($this->serversByTags[$tag])) {
throw ServerProviderException::serverTagNotFound($tag);
}

return $this->getServer(array_rand($this->serversByTags[$tag], 1));
}

public function getRandomServerFromCluster(string $cluster): Server
{
$cluster = $this->getCluster($cluster);
$randomServerIndex = array_rand($cluster->getServers(), 1);
$randomServerHostname = array_rand($cluster->getServers(), 1);

return $cluster->getServerByHostname($randomServerHostname);
}

public function getRandomServerFromClusterByTag(string $cluster, string $tag): Server
{
$cluster = $this->getCluster($cluster);

$randomServerHostname = array_rand($cluster->getServersByTag($tag), 1);

return $cluster->getServerByHostname($randomServerIndex);
return $cluster->getServerByHostname($randomServerHostname);
}

public function getServerFromCluster(string $cluster, string $serverHostname)
Expand All @@ -92,7 +123,7 @@ public function getServer(string $serverHostname): Server
return $this->servers[$serverHostname];
}

public function getCluster(string $cluster) : Cluster
public function getCluster(string $cluster): Cluster
{
if (!isset($this->clusters[$cluster])) {
throw ServerProviderException::clusterNotFound($cluster);
Expand Down
Loading

0 comments on commit 46ccafe

Please sign in to comment.