blob: 725ae9b9fc4472177b9c88e88b48d0eb2db97b3b [file] [log] [blame]
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc Apache License http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Simple Processor RPC Benchmark Client Thread
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public abstract class AbstractClientRunnable implements ClientRunnable {
private static final Log LOGGER = LogFactory.getLog(AbstractClientRunnable.class);
private CyclicBarrier barrier;
private CountDownLatch latch;
private long endTime;
private boolean running = true;
// response time spread
private long[] responseSpreads = new long[9];
// error request per second
private long[] errorTPS = null;
// error response times per second
private long[] errorResponseTimes = null;
// tps per second
private long[] tps = null;
// response times per second
private long[] responseTimes = null;
// benchmark startTime
private long startTime;
// benchmark maxRange
private int maxRange;
private ServiceFactory serviceFactory = new ServiceFactory();
public AbstractClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime){
this.barrier = barrier;
this.latch = latch;
this.startTime = startTime;
this.endTime = endTime;
serviceFactory.setTargetIP(targetIP);
serviceFactory.setClientNums(clientNums);
serviceFactory.setTargetPort(targetPort);
serviceFactory.setConnectTimeout(rpcTimeout);
maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000000) + 1;
errorTPS = new long[maxRange];
errorResponseTimes = new long[maxRange];
tps = new long[maxRange];
responseTimes = new long[maxRange];
// init
for (int i = 0; i < maxRange; i++) {
errorTPS[i] = 0;
errorResponseTimes[i] = 0;
tps[i] = 0;
responseTimes[i] = 0;
}
}
public void run() {
try {
barrier.await();
} catch (Exception e) {
// IGNORE
}
runJavaAndHessian();
latch.countDown();
}
private void runJavaAndHessian() {
while (running) {
long beginTime = System.nanoTime() / 1000L;
if (beginTime >= endTime) {
running = false;
break;
}
try {
Object result = invoke(serviceFactory);
long currentTime = System.nanoTime() / 1000L;
if (beginTime <= startTime) {
continue;
}
long consumeTime = currentTime - beginTime;
sumResponseTimeSpread(consumeTime);
int range = Integer.parseInt(String.valueOf(beginTime - startTime)) / 1000000;
if (range >= maxRange) {
System.err.println("benchmark range exceeds maxRange,range is: " + range + ",maxRange is: "
+ maxRange);
continue;
}
if (result != null) {
tps[range] = tps[range] + 1;
responseTimes[range] = responseTimes[range] + consumeTime;
} else {
LOGGER.error("server return result is null");
errorTPS[range] = errorTPS[range] + 1;
errorResponseTimes[range] = errorResponseTimes[range] + consumeTime;
}
} catch (Exception e) {
LOGGER.error("client.invokeSync error", e);
long currentTime = System.nanoTime() / 1000L;
if (beginTime <= startTime) {
continue;
}
long consumeTime = currentTime - beginTime;
sumResponseTimeSpread(consumeTime);
int range = Integer.parseInt(String.valueOf(beginTime - startTime)) / 1000000;
if (range >= maxRange) {
System.err.println("benchmark range exceeds maxRange,range is: " + range + ",maxRange is: "
+ maxRange);
continue;
}
errorTPS[range] = errorTPS[range] + 1;
errorResponseTimes[range] = errorResponseTimes[range] + consumeTime;
}
}
}
public abstract Object invoke(ServiceFactory<?> serviceFactory);
public List<long[]> getResults() {
List<long[]> results = new ArrayList<long[]>();
results.add(responseSpreads);
results.add(tps);
results.add(responseTimes);
results.add(errorTPS);
results.add(errorResponseTimes);
return results;
}
private void sumResponseTimeSpread(long responseTime) {
responseTime = responseTime / 1000L;
if (responseTime <= 0) {
responseSpreads[0] = responseSpreads[0] + 1;
} else if (responseTime > 0 && responseTime <= 1) {
responseSpreads[1] = responseSpreads[1] + 1;
} else if (responseTime > 1 && responseTime <= 5) {
responseSpreads[2] = responseSpreads[2] + 1;
} else if (responseTime > 5 && responseTime <= 10) {
responseSpreads[3] = responseSpreads[3] + 1;
} else if (responseTime > 10 && responseTime <= 50) {
responseSpreads[4] = responseSpreads[4] + 1;
} else if (responseTime > 50 && responseTime <= 100) {
responseSpreads[5] = responseSpreads[5] + 1;
} else if (responseTime > 100 && responseTime <= 500) {
responseSpreads[6] = responseSpreads[6] + 1;
} else if (responseTime > 500 && responseTime <= 1000) {
responseSpreads[7] = responseSpreads[7] + 1;
} else if (responseTime > 1000) {
responseSpreads[8] = responseSpreads[8] + 1;
}
}
}