Skip to content

Commit

Permalink
example/grpc: Limit amount of outstanding operations
Browse files Browse the repository at this point in the history
  • Loading branch information
fruhland committed Jun 29, 2022
1 parent e351ffa commit 60726dc
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public class BenchmarkDemo implements Runnable {
required = true)
private int answerSize;

@CommandLine.Option(
names = {"-t", "--threshold"},
description = "The maximum amount of RPCs to perform before flushing.")
private int aggregationThreshold = 64;

@CommandLine.Option(
names = {"-b", "--blocking"},
description = "Whether to perform a blocking (latency optimised) or non-blocking (throughput optimised) benchmark.")
Expand Down Expand Up @@ -80,7 +85,7 @@ public void run() {
bindAddress = isServer ? bindAddress : new InetSocketAddress(bindAddress.getAddress(), 0);
}

final Runnable runnable = isServer ? new Server(bindAddress, answerSize) : new Client(remoteAddress, requestCount, requestSize, answerSize, connections, blocking);
final Runnable runnable = isServer ? new Server(bindAddress, answerSize) : new Client(remoteAddress, requestCount, requestSize, answerSize, connections, aggregationThreshold, blocking);
runnable.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ public class Client implements Runnable {
private final int requestSize;
private final int answerSize;
private final int connections;
private final int aggregationThreshold;
private final boolean blocking;
private final CyclicBarrier benchmarkBarrier;

public Client(final InetSocketAddress remoteAddress, final int requestCount, final int requestSize, final int answerSize, final int connections, boolean blocking) {
public Client(final InetSocketAddress remoteAddress, final int requestCount, final int requestSize, final int answerSize, final int connections, final int aggregationThreshold, final boolean blocking) {
this.remoteAddress = remoteAddress;
this.requestCount = requestCount;
this.requestSize = requestSize;
this.answerSize = answerSize;
this.connections = connections;
benchmarkBarrier = new CyclicBarrier(connections);
this.aggregationThreshold = aggregationThreshold;
this.blocking = blocking;
benchmarkBarrier = new CyclicBarrier(connections);
}

@Override
Expand All @@ -50,7 +52,7 @@ public void run() {
.channelType(NioSocketChannel.class)
.usePlaintext()
.build();
runnables[i] = blocking ? new BlockingRunnable(channel, benchmarkBarrier, (LatencyCombiner) combiner, requestCount, requestSize, answerSize) : new NonBlockingRunnable(channel, benchmarkBarrier, (ThroughputCombiner) combiner, requestCount, requestSize, answerSize);
runnables[i] = blocking ? new BlockingRunnable(channel, benchmarkBarrier, (LatencyCombiner) combiner, requestCount, requestSize, answerSize) : new NonBlockingRunnable(channel, benchmarkBarrier, (ThroughputCombiner) combiner, requestCount, requestSize, answerSize, aggregationThreshold);
threads[i] = new Thread(runnables[i]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import de.hhu.bsinfo.hadronio.util.LatencyCombiner;
import de.hhu.bsinfo.hadronio.util.LatencyResult;
import de.hhu.bsinfo.hadronio.util.ThroughputCombiner;
import de.hhu.bsinfo.hadronio.util.ThroughputResult;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand All @@ -25,13 +25,17 @@ public class NonBlockingRunnable implements Runnable {
private final ThroughputCombiner combiner;
private final BenchmarkMessage message;
private final int requestCount;
private final int aggregationTreshold;
private final Queue<ListenableFuture<BenchmarkMessage>> futureQueue;

public NonBlockingRunnable(final Channel channel, final CyclicBarrier syncBarrier, final ThroughputCombiner combiner, final int requestCount, final int requestSize, final int answerSize) {
public NonBlockingRunnable(final Channel channel, final CyclicBarrier syncBarrier, final ThroughputCombiner combiner, final int requestCount, final int requestSize, final int answerSize, int aggregationTreshold) {
futureStub = BenchmarkGrpc.newFutureStub(channel);
result = new ThroughputResult(requestCount, requestSize + answerSize);
this.benchmarkBarrier = syncBarrier;
this.combiner = combiner;
this.requestCount = requestCount;
this.aggregationTreshold = aggregationTreshold;
futureQueue = new ArrayBlockingQueue<>(aggregationTreshold);

final byte[] requestBytes = new byte[requestSize];
for (int i = 0; i < requestSize; i++) {
Expand All @@ -45,10 +49,7 @@ public void run() {
// Warmup
final int warmupCount = requestCount / 10 > 0 ? requestCount / 10 : 1;
LOGGER.info("Starting warmup with [{}] requests", warmupCount);

for (int i = 0; i < warmupCount - 1; i++) {
futureStub.benchmark(message);
}
performCalls(warmupCount);

ListenableFuture<BenchmarkMessage> finalFuture = futureStub.benchmark(message);
while (!finalFuture.isDone()) {
Expand All @@ -63,19 +64,9 @@ public void run() {
// Benchmark
benchmarkBarrier.await();
LOGGER.info("Starting benchmark with [{}] requests", requestCount);
final long startTime = System.nanoTime();

for (int i = 0; i < requestCount - 1; i++) {
futureStub.benchmark(message);
}

finalFuture = futureStub.benchmark(message);
while (!finalFuture.isDone()) {
if (finalFuture.isCancelled()) {
throw new IllegalStateException("Benchmark failed!");
}
}

final long startTime = System.nanoTime();
performCalls(requestCount);
result.setMeasuredTime(System.nanoTime() - startTime);

final ManagedChannel channel = (ManagedChannel) futureStub.getChannel();
Expand All @@ -88,4 +79,31 @@ public void run() {
combiner.addResult(result);
LOGGER.info("{}", result);
}

private void performCalls(final int operationCount) {
for (int i = 0; i < aggregationTreshold; i++) {
futureQueue.add(futureStub.benchmark(message));
}

int performedOperations = aggregationTreshold;
while (performedOperations < operationCount) {
while (futureQueue.size() > 0 && futureQueue.peek().isDone()) {
futureQueue.poll();
}

while (futureQueue.size() < aggregationTreshold && performedOperations < operationCount) {
futureQueue.add(futureStub.benchmark(message));
performedOperations++;
}
}

while (futureQueue.size() > 0) {
ListenableFuture<BenchmarkMessage> future = futureQueue.poll();
while (!future.isDone()) {
if (future.isCancelled()) {
throw new IllegalStateException("Benchmark failed!");
}
}
}
}
}

0 comments on commit 60726dc

Please sign in to comment.