blob: 87561bac745b1b8da44a468293be6fe16ed351bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* Some basic ipc tests.
*/
public abstract class AbstractTestIPC {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class);
private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
protected static final Configuration CONF = HBaseConfiguration.create();
static {
// Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
}
protected abstract RpcServer createRpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) throws IOException;
protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
/**
* Ensure we do not HAVE TO HAVE a codec.
*/
@Test
public void testNoCodec() throws IOException, ServiceException {
Configuration conf = HBaseConfiguration.create();
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
String message = "hello";
assertEquals(message,
stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
assertNull(pcrc.cellScanner());
} finally {
rpcServer.stop();
}
}
protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf);
/**
* It is hard to verify the compression is actually happening under the wraps. Hope that if
* unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
* confirm that compression is happening down in the client and server).
*/
@Test
public void testCompressCellBlock() throws IOException, ServiceException {
Configuration conf = new Configuration(HBaseConfiguration.create());
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
List<Cell> cells = new ArrayList<>();
int count = 3;
for (int i = 0; i < count; i++) {
cells.add(CELL);
}
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
String message = "hello";
assertEquals(message,
stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
int index = 0;
CellScanner cellScanner = pcrc.cellScanner();
assertNotNull(cellScanner);
while (cellScanner.advance()) {
assertEquals(CELL, cellScanner.current());
index++;
}
assertEquals(count, index);
} finally {
rpcServer.stop();
}
}
protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(
Configuration conf) throws IOException;
@Test
public void testRTEDuringConnectionSetup() throws Exception {
Configuration conf = HBaseConfiguration.create();
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.ping(null, EmptyRequestProto.getDefaultInstance());
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
} finally {
rpcServer.stop();
}
}
/**
* Tests that the rpc scheduler is called when requests arrive.
*/
@Test
public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler);
verify(scheduler).init((RpcScheduler.Context) anyObject());
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
verify(scheduler).start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
for (int i = 0; i < 10; i++) {
stub.echo(null, param);
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally {
rpcServer.stop();
verify(scheduler).stop();
}
}
/** Tests that the rpc scheduler is called when requests arrive. */
@Test
public void testRpcMaxRequestSize() throws IOException, ServiceException {
Configuration conf = new Configuration(CONF);
conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), conf,
new FifoRpcScheduler(conf, 1));
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
StringBuilder message = new StringBuilder(1200);
for (int i = 0; i < 200; i++) {
message.append("hello.");
}
// set total RPC size bigger than 100 bytes
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
stub.echo(
new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
param);
fail("RPC should have failed because it exceeds max request size");
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e);
assertTrue(e.toString(),
StringUtils.stringifyException(e).contains("RequestTooBigException"));
} finally {
rpcServer.stop();
}
}
/**
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
* remoteAddress set to its Call Object
*/
@Test
public void testRpcServerForNotNullRemoteAddressInCallObject()
throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
assertEquals(localAddr.getAddress().getHostAddress(),
stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());
} finally {
rpcServer.stop();
}
}
@Test
public void testRemoteError() throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.error(null, EmptyRequestProto.getDefaultInstance());
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e);
IOException ioe = ProtobufUtil.handleRemoteException(e);
assertTrue(ioe instanceof DoNotRetryIOException);
assertTrue(ioe.getMessage().contains("server error!"));
} finally {
rpcServer.stop();
}
}
@Test
public void testTimeout() throws IOException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
int ms = 1000;
int timeout = 100;
for (int i = 0; i < 10; i++) {
pcrc.reset();
pcrc.setCallTimeout(timeout);
long startTime = System.nanoTime();
try {
stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());
} catch (ServiceException e) {
long waitTime = (System.nanoTime() - startTime) / 1000000;
// expected
LOG.info("Caught expected exception: " + e);
IOException ioe = ProtobufUtil.handleRemoteException(e);
assertTrue(ioe.getCause() instanceof CallTimeoutException);
// confirm that we got exception before the actual pause.
assertTrue(waitTime < ms);
}
}
} finally {
rpcServer.stop();
}
}
protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) throws IOException;
/** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test
public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
Configuration conf = new Configuration(CONF);
RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
stub.echo(null, param);
fail("RPC should have failed because connection closed");
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e.toString());
} finally {
rpcServer.stop();
}
}
@Test
public void testAsyncEcho() throws IOException {
Configuration conf = HBaseConfiguration.create();
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
rpcServer.start();
Interface stub = newStub(client, rpcServer.getListenerAddress());
int num = 10;
List<HBaseRpcController> pcrcList = new ArrayList<>();
List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();
for (int i = 0; i < num; i++) {
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
pcrcList.add(pcrc);
callbackList.add(done);
}
for (int i = 0; i < num; i++) {
HBaseRpcController pcrc = pcrcList.get(i);
assertFalse(pcrc.failed());
assertNull(pcrc.cellScanner());
assertEquals("hello-" + i, callbackList.get(i).get().getMessage());
}
} finally {
rpcServer.stop();
}
}
@Test
public void testAsyncRemoteError() throws IOException {
AbstractRpcClient<?> client = createRpcClient(CONF);
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try {
rpcServer.start();
Interface stub = newStub(client, rpcServer.getListenerAddress());
BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);
assertNull(callback.get());
assertTrue(pcrc.failed());
LOG.info("Caught expected exception: " + pcrc.getFailed());
IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
assertTrue(ioe instanceof DoNotRetryIOException);
assertTrue(ioe.getMessage().contains("server error!"));
} finally {
client.close();
rpcServer.stop();
}
}
@Test
public void testAsyncTimeout() throws IOException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
Interface stub = newStub(client, rpcServer.getListenerAddress());
List<HBaseRpcController> pcrcList = new ArrayList<>();
List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();
int ms = 1000;
int timeout = 100;
long startTime = System.nanoTime();
for (int i = 0; i < 10; i++) {
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
pcrc.setCallTimeout(timeout);
BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);
pcrcList.add(pcrc);
callbackList.add(callback);
}
for (BlockingRpcCallback<?> callback : callbackList) {
assertNull(callback.get());
}
long waitTime = (System.nanoTime() - startTime) / 1000000;
for (HBaseRpcController pcrc : pcrcList) {
assertTrue(pcrc.failed());
LOG.info("Caught expected exception: " + pcrc.getFailed());
IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
assertTrue(ioe.getCause() instanceof CallTimeoutException);
}
// confirm that we got exception before the actual pause.
assertTrue(waitTime < ms);
} finally {
rpcServer.stop();
}
}
}