blob: f663a6af0979876b43eaa15a7018316ba4916d43 [file] [log] [blame]
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc Apache License http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Simple Processor RPC Benchmark Client Thread
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public class SimpleProcessorBenchmarkClientRunnable implements ClientRunnable {
private static final Log LOGGER = LogFactory.getLog(SimpleProcessorBenchmarkClientRunnable.class);
private int requestSize;
private CyclicBarrier barrier;
private CountDownLatch latch;
private long endTime;
private boolean running = true;
private ExchangeClientFactory clientFactory = new ExchangeClientFactory();
private String targetIP;
private int targetPort;
private int clientNums;
private int rpcTimeout;
// response time spread
private long[] responseSpreads = new long[9];
// error request per second
private long[] errorTPS = null;
// error response times per second
private long[] errorResponseTimes = null;
// tps per second
private long[] tps = null;
// response times per second
private long[] responseTimes = null;
// benchmark startTime
private long startTime;
// benchmark maxRange
private int maxRange;
public SimpleProcessorBenchmarkClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime,
long endTime){
this.targetIP = targetIP;
this.targetPort = targetPort;
this.clientNums = clientNums;
this.rpcTimeout = rpcTimeout;
this.barrier = barrier;
this.latch = latch;
this.startTime = startTime;
this.endTime = endTime;
maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000) + 1;
errorTPS = new long[maxRange];
errorResponseTimes = new long[maxRange];
tps = new long[maxRange];
responseTimes = new long[maxRange];
// init
for (int i = 0; i < maxRange; i++) {
errorTPS[i] = 0;
errorResponseTimes[i] = 0;
tps[i] = 0;
responseTimes[i] = 0;
}
}
public void run() {
try {
barrier.await();
} catch (Exception e) {
// IGNORE
}
runJavaAndHessian();
latch.countDown();
}
private void runJavaAndHessian() {
while (running) {
Object requestObject = new RequestObject(requestSize);
long beginTime = System.nanoTime();
if (beginTime >= endTime) {
running = false;
break;
}
try {
Object response = null;
response = clientFactory.get(targetIP, targetPort, rpcTimeout, clientNums).request(requestObject).get();
long currentTime = System.nanoTime();
if (beginTime <= startTime) {
continue;
}
long consumeTime = currentTime - beginTime;
sumResponseTimeSpread(consumeTime);
int range = Integer.parseInt(String.valueOf(beginTime - startTime)) / 1000;
if (range >= maxRange) {
System.err.println("benchmark range exceeds maxRange,range is: " + range + ",maxRange is: "
+ maxRange);
continue;
}
if (((ResponseObject) response).getBytes() != null) {
tps[range] = tps[range] + 1;
responseTimes[range] = responseTimes[range] + consumeTime;
} else {
LOGGER.error("server return response is null");
errorTPS[range] = errorTPS[range] + 1;
errorResponseTimes[range] = errorResponseTimes[range] + consumeTime;
}
} catch (Exception e) {
LOGGER.error("client.invokeSync error", e);
long currentTime = System.nanoTime();
if (beginTime <= startTime) {
continue;
}
long consumeTime = currentTime - beginTime;
sumResponseTimeSpread(consumeTime);
int range = Integer.parseInt(String.valueOf(beginTime - startTime)) / 1000;
if (range >= maxRange) {
System.err.println("benchmark range exceeds maxRange,range is: " + range + ",maxRange is: "
+ maxRange);
continue;
}
errorTPS[range] = errorTPS[range] + 1;
errorResponseTimes[range] = errorResponseTimes[range] + consumeTime;
}
}
}
public List<long[]> getResults() {
List<long[]> results = new ArrayList<long[]>();
results.add(responseSpreads);
results.add(tps);
results.add(responseTimes);
results.add(errorTPS);
results.add(errorResponseTimes);
return results;
}
private void sumResponseTimeSpread(long responseTime) {
responseTime = responseTime / 1000;
if (responseTime <= 0) {
responseSpreads[0] = responseSpreads[0] + 1;
} else if (responseTime > 0 && responseTime <= 1) {
responseSpreads[1] = responseSpreads[1] + 1;
} else if (responseTime > 1 && responseTime <= 5) {
responseSpreads[2] = responseSpreads[2] + 1;
} else if (responseTime > 5 && responseTime <= 10) {
responseSpreads[3] = responseSpreads[3] + 1;
} else if (responseTime > 10 && responseTime <= 50) {
responseSpreads[4] = responseSpreads[4] + 1;
} else if (responseTime > 50 && responseTime <= 100) {
responseSpreads[5] = responseSpreads[5] + 1;
} else if (responseTime > 100 && responseTime <= 500) {
responseSpreads[6] = responseSpreads[6] + 1;
} else if (responseTime > 500 && responseTime <= 1000) {
responseSpreads[7] = responseSpreads[7] + 1;
} else if (responseTime > 1000) {
responseSpreads[8] = responseSpreads[8] + 1;
}
}
}