blob: 5fe23021398ea590b037da138ee9efaf14d67257 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Joiner;
import com.google.protobuf.BlockingService;
/**
* Benchmark for protobuf RPC.
* Run with --help option for usage.
*/
public class RPCCallBenchmark implements Tool, Configurable {
private Configuration conf;
private AtomicLong callCount = new AtomicLong(0);
private static ThreadMXBean threadBean =
ManagementFactory.getThreadMXBean();
private static class MyOptions {
private boolean failed = false;
private int serverThreads = 0;
private int serverReaderThreads = 1;
private int clientThreads = 0;
private String host = "0.0.0.0";
private int port = 12345;
public int secondsToRun = 15;
private int msgSize = 1024;
public Class<? extends RpcEngine> rpcEngine =
WritableRpcEngine.class;
private MyOptions(String args[]) {
try {
Options opts = buildOptions();
CommandLineParser parser = new GnuParser();
CommandLine line = parser.parse(opts, args, true);
processOptions(line, opts);
validateOptions();
} catch (ParseException e) {
System.err.println(e.getMessage());
System.err.println("Try \"--help\" option for details.");
failed = true;
}
}
private void validateOptions() throws ParseException {
if (serverThreads <= 0 && clientThreads <= 0) {
throw new ParseException("Must specify at least -c or -s");
}
}
@SuppressWarnings("static-access")
private Options buildOptions() {
Options opts = new Options();
opts.addOption(
OptionBuilder.withLongOpt("serverThreads").hasArg(true)
.withArgName("numthreads")
.withDescription("number of server threads (handlers) to run (or 0 to not run server)")
.create("s"));
opts.addOption(
OptionBuilder.withLongOpt("serverReaderThreads").hasArg(true)
.withArgName("threads")
.withDescription("number of server reader threads to run")
.create("r"));
opts.addOption(
OptionBuilder.withLongOpt("clientThreads").hasArg(true)
.withArgName("numthreads")
.withDescription("number of client threads to run (or 0 to not run client)")
.create("c"));
opts.addOption(
OptionBuilder.withLongOpt("messageSize").hasArg(true)
.withArgName("bytes")
.withDescription("size of call parameter in bytes")
.create("m"));
opts.addOption(
OptionBuilder.withLongOpt("time").hasArg(true)
.withArgName("seconds")
.withDescription("number of seconds to run clients for")
.create("t"));
opts.addOption(
OptionBuilder.withLongOpt("port").hasArg(true)
.withArgName("port")
.withDescription("port to listen or connect on")
.create("p"));
opts.addOption(
OptionBuilder.withLongOpt("host").hasArg(true)
.withArgName("addr")
.withDescription("host to listen or connect on")
.create('h'));
opts.addOption(
OptionBuilder.withLongOpt("engine").hasArg(true)
.withArgName("writable|protobuf")
.withDescription("engine to use")
.create('e'));
opts.addOption(
OptionBuilder.withLongOpt("help").hasArg(false)
.withDescription("show this screen")
.create('?'));
return opts;
}
private void processOptions(CommandLine line, Options opts)
throws ParseException {
if (line.hasOption("help") || line.hasOption('?')) {
HelpFormatter formatter = new HelpFormatter();
System.out.println("Protobuf IPC benchmark.");
System.out.println();
formatter.printHelp(100,
"java ... PBRPCBenchmark [options]",
"\nSupported options:", opts, "");
return;
}
if (line.hasOption('s')) {
serverThreads = Integer.parseInt(line.getOptionValue('s'));
}
if (line.hasOption('r')) {
serverReaderThreads = Integer.parseInt(line.getOptionValue('r'));
}
if (line.hasOption('c')) {
clientThreads = Integer.parseInt(line.getOptionValue('c'));
}
if (line.hasOption('t')) {
secondsToRun = Integer.parseInt(line.getOptionValue('t'));
}
if (line.hasOption('m')) {
msgSize = Integer.parseInt(line.getOptionValue('m'));
}
if (line.hasOption('p')) {
port = Integer.parseInt(line.getOptionValue('p'));
}
if (line.hasOption('h')) {
host = line.getOptionValue('h');
}
if (line.hasOption('e')) {
String eng = line.getOptionValue('e');
if ("protobuf".equals(eng)) {
rpcEngine = ProtobufRpcEngine.class;
} else if ("writable".equals(eng)) {
rpcEngine = WritableRpcEngine.class;
} else {
throw new ParseException("invalid engine: " + eng);
}
}
String[] remainingArgs = line.getArgs();
if (remainingArgs.length != 0) {
throw new ParseException("Extra arguments: " +
Joiner.on(" ").join(remainingArgs));
}
}
@Override
public String toString() {
return "rpcEngine=" + rpcEngine + "\nserverThreads=" + serverThreads
+ "\nserverReaderThreads=" + serverReaderThreads + "\nclientThreads="
+ clientThreads + "\nhost=" + host + "\nport=" + port
+ "\nsecondsToRun=" + secondsToRun + "\nmsgSize=" + msgSize;
}
}
private Server startServer(MyOptions opts) throws IOException {
if (opts.serverThreads <= 0) {
return null;
}
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
opts.serverReaderThreads);
RPC.Server server;
// Get RPC server for server side implementation
if (opts.rpcEngine == ProtobufRpcEngine.class) {
// Create server side implementation
PBServerImpl serverImpl = new PBServerImpl();
BlockingService service = TestProtobufRpcProto
.newReflectiveBlockingService(serverImpl);
server = RPC.getServer(TestRpcService.class, service,
opts.host, opts.port, opts.serverThreads, false, conf, null);
} else if (opts.rpcEngine == WritableRpcEngine.class) {
server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(),
opts.host, opts.port, opts.serverThreads, false, conf, null);
} else {
throw new RuntimeException("Bad engine: " + opts.rpcEngine);
}
server.start();
return server;
}
private long getTotalCpuTime(Iterable<? extends Thread> threads) {
long total = 0;
for (Thread t : threads) {
long tid = t.getId();
total += threadBean.getThreadCpuTime(tid);
}
return total;
}
@Override
public int run(String[] args) throws Exception {
MyOptions opts = new MyOptions(args);
if (opts.failed) {
return -1;
}
// Set RPC engine to the configured RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class, opts.rpcEngine);
Server server = startServer(opts);
try {
TestContext ctx = setupClientTestContext(opts);
if (ctx != null) {
long totalCalls = 0;
ctx.startThreads();
long veryStart = System.nanoTime();
// Loop printing results every second until the specified
// time has elapsed
for (int i = 0; i < opts.secondsToRun ; i++) {
long st = System.nanoTime();
ctx.waitFor(1000);
long et = System.nanoTime();
long ct = callCount.getAndSet(0);
totalCalls += ct;
double callsPerSec = (ct * 1000000000)/(et - st);
System.out.println("Calls per second: " + callsPerSec);
}
// Print results
if (totalCalls > 0) {
long veryEnd = System.nanoTime();
double callsPerSec =
(totalCalls * 1000000000)/(veryEnd - veryStart);
long cpuNanosClient = getTotalCpuTime(ctx.getTestThreads());
long cpuNanosServer = -1;
if (server != null) {
cpuNanosServer = getTotalCpuTime(server.getHandlers());;
}
System.out.println("====== Results ======");
System.out.println("Options:\n" + opts);
System.out.println("Total calls per second: " + callsPerSec);
System.out.println("CPU time per call on client: " +
(cpuNanosClient / totalCalls) + " ns");
if (server != null) {
System.out.println("CPU time per call on server: " +
(cpuNanosServer / totalCalls) + " ns");
}
} else {
System.out.println("No calls!");
}
ctx.stop();
} else {
while (true) {
Thread.sleep(10000);
}
}
} finally {
if (server != null) {
server.stop();
}
}
return 0;
}
private TestContext setupClientTestContext(final MyOptions opts)
throws IOException, InterruptedException {
if (opts.clientThreads <= 0) {
return null;
}
// Set up a separate proxy for each client thread,
// rather than making them share TCP pipes.
int numProxies = opts.clientThreads;
final RpcServiceWrapper proxies[] = new RpcServiceWrapper[numProxies];
for (int i = 0; i < numProxies; i++) {
proxies[i] =
UserGroupInformation.createUserForTesting("proxy-" + i,new String[]{})
.doAs(new PrivilegedExceptionAction<RpcServiceWrapper>() {
@Override
public RpcServiceWrapper run() throws Exception {
return createRpcClient(opts);
}
});
}
// Create an echo message of the desired length
final StringBuilder msgBuilder = new StringBuilder(opts.msgSize);
for (int c = 0; c < opts.msgSize; c++) {
msgBuilder.append('x');
}
final String echoMessage = msgBuilder.toString();
// Create the clients in a test context
TestContext ctx = new TestContext();
for (int i = 0; i < opts.clientThreads; i++) {
final RpcServiceWrapper proxy = proxies[i % numProxies];
ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
proxy.doEcho(echoMessage);
callCount.incrementAndGet();
}
});
}
return ctx;
}
/**
* Simple interface that can be implemented either by the
* protobuf or writable implementations.
*/
private interface RpcServiceWrapper {
public String doEcho(String msg) throws Exception;
}
/**
* Create a client proxy for the specified engine.
*/
private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(opts.host, opts.port);
if (opts.rpcEngine == ProtobufRpcEngine.class) {
final TestRpcService proxy = RPC.getProxy(TestRpcService.class, 0, addr, conf);
return new RpcServiceWrapper() {
@Override
public String doEcho(String msg) throws Exception {
EchoRequestProto req = EchoRequestProto.newBuilder()
.setMessage(msg)
.build();
EchoResponseProto responseProto = proxy.echo(null, req);
return responseProto.getMessage();
}
};
} else if (opts.rpcEngine == WritableRpcEngine.class) {
final TestProtocol proxy = (TestProtocol)RPC.getProxy(
TestProtocol.class, TestProtocol.versionID, addr, conf);
return new RpcServiceWrapper() {
@Override
public String doEcho(String msg) throws Exception {
return proxy.echo(msg);
}
};
} else {
throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
}
}
public static void main(String []args) throws Exception {
int rc = ToolRunner.run(new RPCCallBenchmark(), args);
System.exit(rc);
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
}