blob: 6c0fbcc3021d2bf7f1fe12185260f5fe80954271 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.Descriptors.MethodDescriptor;
@Category(IntegrationTests.class)
public class IntegrationTestRpcClient {
private static final Log LOG = LogFactory.getLog(IntegrationTestRpcClient.class);
private final Configuration conf;
private int numIterations = 10;
public IntegrationTestRpcClient() {
conf = HBaseConfiguration.create();
}
static class TestRpcServer extends RpcServer {
TestRpcServer(Configuration conf) throws IOException {
this(new FifoRpcScheduler(conf, 1), conf);
}
TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
super(null, "testRpcServer", Lists
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
"localhost", 0), conf, scheduler);
}
@Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return super.call(service, md, param, cellScanner, receiveTime, status);
}
}
static final BlockingService SERVICE =
TestRpcServiceProtos.TestProtobufRpcProto
.newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws ServiceException {
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
}
});
protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
return isSyncClient ?
new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
new AsyncRpcClient(conf) {
@Override
Codec getCodec() {
return null;
}
};
}
static String BIG_PAYLOAD;
static {
StringBuilder builder = new StringBuilder();
while (builder.length() < 1024 * 1024) { // 2 MB
builder.append("big.payload.");
}
BIG_PAYLOAD = builder.toString();
}
class Cluster {
Random random = new Random();
ReadWriteLock lock = new ReentrantReadWriteLock();
HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
List<TestRpcServer> serverList = new ArrayList<>();
int maxServers;
int minServers;
Cluster(int minServers, int maxServers) {
this.minServers = minServers;
this.maxServers = maxServers;
}
TestRpcServer startServer() throws IOException {
lock.writeLock().lock();
try {
if (rpcServers.size() >= maxServers) {
return null;
}
TestRpcServer rpcServer = new TestRpcServer(conf);
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
rpcServers.put(address, rpcServer);
serverList.add(rpcServer);
LOG.info("Started server: " + address);
return rpcServer;
} finally {
lock.writeLock().unlock();
}
}
void stopRandomServer() throws Exception {
lock.writeLock().lock();
TestRpcServer rpcServer = null;
try {
if (rpcServers.size() <= minServers) {
return;
}
int size = rpcServers.size();
int rand = random.nextInt(size);
rpcServer = serverList.remove(rand);
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
// Throw exception here. We can't remove this instance from the server map because
// we no longer have access to its map key
throw new IOException("Listener channel is closed");
}
rpcServers.remove(address);
if (rpcServer != null) {
stopServer(rpcServer);
}
} finally {
lock.writeLock().unlock();
}
}
void stopServer(TestRpcServer rpcServer) throws InterruptedException {
InetSocketAddress address = rpcServer.getListenerAddress();
LOG.info("Stopping server: " + address);
rpcServer.stop();
rpcServer.join();
LOG.info("Stopped server: " + address);
}
void stopRunning() throws InterruptedException {
lock.writeLock().lock();
try {
for (TestRpcServer rpcServer : serverList) {
stopServer(rpcServer);
}
} finally {
lock.writeLock().unlock();
}
}
TestRpcServer getRandomServer() {
lock.readLock().lock();
try {
int size = rpcServers.size();
int rand = random.nextInt(size);
return serverList.get(rand);
} finally {
lock.readLock().unlock();
}
}
}
static class MiniChaosMonkey extends Thread {
AtomicBoolean running = new AtomicBoolean(true);
Random random = new Random();
AtomicReference<Exception> exception = new AtomicReference<>(null);
Cluster cluster;
public MiniChaosMonkey(Cluster cluster) {
this.cluster = cluster;
}
@Override
public void run() {
while (running.get()) {
switch (random.nextInt() % 2) {
case 0: //start a server
try {
cluster.startServer();
} catch (Exception e) {
LOG.warn(e);
exception.compareAndSet(null, e);
}
break;
case 1: // stop a server
try {
cluster.stopRandomServer();
} catch (Exception e) {
LOG.warn(e);
exception.compareAndSet(null, e);
}
default:
}
Threads.sleep(100);
}
}
void stopRunning() {
running.set(false);
}
void rethrowException() throws Exception {
if (exception.get() != null) {
throw exception.get();
}
}
}
static class SimpleClient extends Thread {
AbstractRpcClient rpcClient;
AtomicBoolean running = new AtomicBoolean(true);
AtomicBoolean sending = new AtomicBoolean(false);
AtomicReference<Throwable> exception = new AtomicReference<>(null);
Cluster cluster;
String id;
long numCalls = 0;
Random random = new Random();
public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
this.cluster = cluster;
this.rpcClient = rpcClient;
this.id = id;
}
@Override
public void run() {
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
while (running.get()) {
boolean isBigPayload = random.nextBoolean();
String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
TestRpcServer server = cluster.getRandomServer();
try {
User user = User.getCurrent();
InetSocketAddress address = server.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
sending.set(true);
ret = (EchoResponseProto)
rpcClient.callBlockingMethod(md, null, param, ret, user, address);
} catch (Exception e) {
LOG.warn(e);
continue; // expected in case connection is closing or closed
}
try {
assertNotNull(ret);
assertEquals(message, ret.getMessage());
} catch (Throwable t) {
exception.compareAndSet(null, t);
}
numCalls++;
}
}
void stopRunning() {
running.set(false);
}
boolean isSending() {
return sending.get();
}
void rethrowException() throws Throwable {
if (exception.get() != null) {
throw exception.get();
}
}
}
/*
Test that not started connections are successfully removed from connection pool when
rpc client is closing.
*/
@Test (timeout = 30000)
public void testRpcWithWriteThread() throws IOException, InterruptedException {
LOG.info("Starting test");
Cluster cluster = new Cluster(1, 1);
cluster.startServer();
conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
for(int i = 0; i <1000; i++) {
AbstractRpcClient rpcClient = createRpcClient(conf, true);
SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
client.start();
while(!client.isSending()) {
Thread.sleep(1);
}
client.stopRunning();
rpcClient.close();
}
}
@Test (timeout = 900000)
public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
for (int i = 0; i < numIterations; i++) {
TimeoutThread.runWithTimeout(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
testRpcWithChaosMonkey(true);
} catch (Throwable e) {
if (e instanceof Exception) {
throw (Exception)e;
} else {
throw new Exception(e);
}
}
return null;
}
}, 90000);
}
}
@Test (timeout = 900000)
@Ignore // TODO: test fails with async client
public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
for (int i = 0; i < numIterations; i++) {
TimeoutThread.runWithTimeout(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
testRpcWithChaosMonkey(false);
} catch (Throwable e) {
if (e instanceof Exception) {
throw (Exception)e;
} else {
throw new Exception(e);
}
}
return null;
}
}, 90000);
}
}
static class TimeoutThread extends Thread {
long timeout;
public TimeoutThread(long timeout) {
this.timeout = timeout;
}
@Override
public void run() {
try {
Thread.sleep(timeout);
Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
System.exit(1); // a timeout happened
} catch (InterruptedException e) {
// this is what we want
}
}
// runs in the same thread context but injects a timeout thread which will exit the JVM on
// timeout
static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
TimeoutThread thread = new TimeoutThread(timeout);
thread.start();
callable.call();
thread.interrupt();
}
}
public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
LOG.info("Starting test");
Cluster cluster = new Cluster(10, 100);
for (int i = 0; i < 10; i++) {
cluster.startServer();
}
ArrayList<SimpleClient> clients = new ArrayList<>();
// all threads should share the same rpc client
AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
for (int i = 0; i < 30; i++) {
String clientId = "client_" + i + "_";
LOG.info("Starting client: " + clientId);
SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
client.start();
clients.add(client);
}
LOG.info("Starting MiniChaosMonkey");
MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
cm.start();
Threads.sleep(30000);
LOG.info("Stopping MiniChaosMonkey");
cm.stopRunning();
cm.join();
cm.rethrowException();
LOG.info("Stopping clients");
for (SimpleClient client : clients) {
LOG.info("Stopping client: " + client.id);
LOG.info(client.id + " numCalls:" + client.numCalls);
client.stopRunning();
client.join();
client.rethrowException();
assertTrue(client.numCalls > 10);
}
LOG.info("Stopping RpcClient");
rpcClient.close();
LOG.info("Stopping Cluster");
cluster.stopRunning();
}
}