Skip to content

Commit

Permalink
Refactor Client close()
Browse files Browse the repository at this point in the history
  • Loading branch information
twose committed May 11, 2022
1 parent 54eab65 commit 0672526
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 69 deletions.
11 changes: 7 additions & 4 deletions examples/etcd/put_double.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
$request = new Etcdserverpb\PutRequest();
$request->setPrevKv(true);

Coroutine::create(function () use ($kvClient, $request) {
$barrier = Coroutine\Barrier::make();

Coroutine::create(function () use ($kvClient, $request, $barrier) {
$request->setKey('Hello~');
$request->setValue('I am Swoole!');
[$reply, $status] = $kvClient->Put($request);
Expand All @@ -21,7 +23,7 @@
}
});

Coroutine::create(function () use ($kvClient, $request) {
Coroutine::create(function () use ($kvClient, $request, $barrier) {
$request->setKey('Hey~');
$request->setValue('How are u Etcd?');
[$reply, $status] = $kvClient->Put($request);
Expand All @@ -33,6 +35,7 @@
}
});

// wait all of the responses back
$kvClient->closeWait();
// wait all the responses back
$barrier::wait($barrier);
$kvClient->close();
});
8 changes: 5 additions & 3 deletions examples/etcd/stress.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
$request->setPrevKv(true);
$request->setValue('Swoole');

$barrier = Coroutine\Barrier::make();
$start = microtime(true);
for ($i = 10000; $i--;) {
Coroutine::create(function () use ($kvClient, $request, $i) {
Coroutine::create(function () use ($kvClient, $request, $i, $barrier) {
$request->setKey("Hello{$i}");
[$reply, $status] = $kvClient->Put($request);
assert($reply->getPrevKv()->getKey() === "Hello{$i}");
Expand All @@ -25,8 +26,9 @@
});
}

// wait all of the responses back
$kvClient->closeWait();
// wait all the responses back
$barrier::wait($barrier);
$kvClient->close();
echo 'use time: ' . (microtime(true) - $start) . "s\n";
var_dump($kvClient->stats());
var_dump(memory_get_usage(true));
Expand Down
79 changes: 25 additions & 54 deletions src/Grpc/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,10 @@ class Client
*/
protected $recvCid = 0;
/**
* The sign of if this Client is closing
* @var int
*/
protected $closing = 0;
/**
* @var Channel
* The sign of if Client->close() was called
* @var bool
*/
protected $closeWaiter;
protected $closed = false;

protected $host;
protected $port;
Expand Down Expand Up @@ -169,36 +165,32 @@ protected function start()
} // else: receiver not found, discard it

// push finished, check if close wait and no coroutine is waiting, if Y, stop recv loop
if (!$this->closing || !empty($this->recvChannelMap)) {
if (!$this->closed || !empty($this->recvChannelMap)) {
continue;
}
}

// if you want to close it or retry connect failed, stop recv loop
if ($this->closing) {
if ($this->closed) {
$need_break = true;
} else {
$need_break = !$this->client->connect();
}

// ↑↓ We must `retry-connect` before we push `false` response
// ↑↓ Then the pop channel coroutine can knows that if this client is available

// clear all, we will auto reconnect, but it need user retry again by himself
if (!empty($this->recvChannelMap)) {
foreach ($this->recvChannelMap as $the_channel) {
$the_channel->close();
}
$this->recvChannelMap = [];
}

if ($need_break) {
break;
}
}

$this->recvCid = 0;
$this->closed();
$this->client->close();
$this->closeWriteSide();
while (!empty($this->recvChannelMap)) {
foreach ($this->recvChannelMap as $index => $the_channel) {
unset($this->recvChannelMap[$index]);
$the_channel->close();
}
}
});

// send wait
Expand All @@ -220,7 +212,7 @@ protected function start()
$this->sendRetChannel->close();

$this->sendCid = 0;
$this->closed();
$this->closeReadSide();
});
}

Expand Down Expand Up @@ -340,46 +332,25 @@ public function recv(int $streamId, float $timeout = null)
return false;
}

public function close(): void
protected function closeReadSide(): void
{
if ($this->closing) {
return;
if ($this->recvCid > 0) {
$this->client->close();
}
$this->closing = 2;
// close write side first
$this->sendChannel->close();
$this->client->close();
}

protected function closed(): void
protected function closeWriteSide(): void
{
if ($this->closing > 0) {
$this->closing--;
}
// close success and notify the close waiter
if ($this->closeWaiter) {
$closeWaiter = $this->closeWaiter;
$this->closeWaiter = null;
$closeWaiter->push(true);
if ($this->sendCid > 0) {
$this->sendChannel->close();
}
}

public function closeWait(): void
public function close(): void
{
if ($this->closing) {
return;
}
$this->closing = 2;
$this->closeWaiter = $closeWaiter = new Channel;
$n = 0;
if ($this->recvCid > 0) {
$n++;
}
if ($this->sendCid > 0) {
$n++;
}
while ($n--) {
$closeWaiter->pop();
}
$this->closed = true;
// close write side first
$this->closeWriteSide();
$this->closeReadSide();
}
}
9 changes: 1 addition & 8 deletions src/Grpc/VirtualClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,4 @@ public function close(): void
// close non-static method body hook
$this->client->close();
}

public function closeWait(): void
{
// closeWait non-static method body hook
$this->client->closeWait();
}

}
}

0 comments on commit 0672526

Please sign in to comment.