blob: 476b1979b2a5483fc0fbc3fccf6527c7aab72383 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestProtoBufRpcServerHandoff {
public static final Logger LOG =
LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class);
@Test(timeout = 20000)
public void test() throws Exception {
Configuration conf = new Configuration();
TestProtoBufRpcServerHandoffServer serverImpl =
new TestProtoBufRpcServerHandoffServer();
BlockingService blockingService =
TestProtobufRpcHandoffProto.newReflectiveBlockingService(serverImpl);
RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class,
ProtobufRpcEngine.class);
RPC.Server server = new RPC.Builder(conf)
.setProtocol(TestProtoBufRpcServerHandoffProtocol.class)
.setInstance(blockingService)
.setVerbose(true)
.setNumHandlers(1) // Num Handlers explicitly set to 1 for test.
.build();
server.start();
InetSocketAddress address = server.getListenerAddress();
long serverStartTime = System.currentTimeMillis();
LOG.info("Server started at: " + address + " at time: " + serverStartTime);
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletionService<ClientInvocationCallable> completionService =
new ExecutorCompletionService<ClientInvocationCallable>(
executorService);
completionService.submit(new ClientInvocationCallable(client, 5000l));
completionService.submit(new ClientInvocationCallable(client, 5000l));
long submitTime = System.currentTimeMillis();
Future<ClientInvocationCallable> future1 = completionService.take();
Future<ClientInvocationCallable> future2 = completionService.take();
ClientInvocationCallable callable1 = future1.get();
ClientInvocationCallable callable2 = future2.get();
LOG.info(callable1.toString());
LOG.info(callable2.toString());
// Ensure the 5 second sleep responses are within a reasonable time of each
// other.
Assert.assertTrue(Math.abs(callable1.endTime - callable2.endTime) < 2000l);
Assert.assertTrue(System.currentTimeMillis() - submitTime < 7000l);
}
private static class ClientInvocationCallable
implements Callable<ClientInvocationCallable> {
final TestProtoBufRpcServerHandoffProtocol client;
final long sleepTime;
TestProtos.SleepResponseProto2 result;
long startTime;
long endTime;
private ClientInvocationCallable(
TestProtoBufRpcServerHandoffProtocol client, long sleepTime) {
this.client = client;
this.sleepTime = sleepTime;
}
@Override
public ClientInvocationCallable call() throws Exception {
startTime = System.currentTimeMillis();
result = client.sleep(null,
TestProtos.SleepRequestProto2.newBuilder().setSleepTime(sleepTime)
.build());
endTime = System.currentTimeMillis();
return this;
}
@Override
public String toString() {
return "startTime=" + startTime + ", endTime=" + endTime +
(result != null ?
", result.receiveTime=" + result.getReceiveTime() +
", result.responseTime=" +
result.getResponseTime() : "");
}
}
@ProtocolInfo(
protocolName = "org.apache.hadoop.ipc.TestProtoBufRpcServerHandoff$TestProtoBufRpcServerHandoffProtocol",
protocolVersion = 1)
public interface TestProtoBufRpcServerHandoffProtocol
extends TestProtobufRpcHandoffProto.BlockingInterface {
}
public static class TestProtoBufRpcServerHandoffServer
implements TestProtoBufRpcServerHandoffProtocol {
@Override
public TestProtos.SleepResponseProto2 sleep
(RpcController controller,
TestProtos.SleepRequestProto2 request) throws
ServiceException {
final long startTime = System.currentTimeMillis();
final ProtobufRpcEngineCallback callback =
ProtobufRpcEngine.Server.registerForDeferredResponse();
final long sleepTime = request.getSleepTime();
new Thread() {
@Override
public void run() {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
callback.setResponse(
TestProtos.SleepResponseProto2.newBuilder()
.setReceiveTime(startTime)
.setResponseTime(System.currentTimeMillis()).build());
}
}.start();
return null;
}
}
}