HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails with broken pipe instead of bad auth (#5740)
Also change the IPC related tests to test different combinations of rpc server&client, for example, NettyRpcClient and SimpleRpcServer
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 1618709..85f7c0a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -290,7 +290,7 @@
});
}
- private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) throws IOException {
+ private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) {
assert eventLoop.inEventLoop();
PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index e4427c1..0f0c22b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -56,6 +56,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -73,6 +74,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -80,8 +82,10 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.hamcrest.Matcher;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,14 +118,12 @@
private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
protected static final Configuration CONF = HBaseConfiguration.create();
- static {
- // Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
- CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
- }
- protected abstract RpcServer createRpcServer(Server server, String name,
+ protected RpcServer createRpcServer(Server server, String name,
List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
- RpcScheduler scheduler) throws IOException;
+ RpcScheduler scheduler) throws IOException {
+ return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
+ }
private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
@@ -133,6 +135,14 @@
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
+ @Parameter(0)
+ public Class<? extends RpcServer> rpcServerImpl;
+
+ @Before
+ public void setUpBeforeTest() {
+ CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
+ }
+
/**
* Ensure we do not HAVE TO HAVE a codec.
*/
@@ -348,9 +358,43 @@
}
}
- protected abstract RpcServer createTestFailingRpcServer(final String name,
+ private static class FailingSimpleRpcServer extends SimpleRpcServer {
+
+ FailingSimpleRpcServer(Server server, String name,
+ List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
+ Configuration conf, RpcScheduler scheduler) throws IOException {
+ super(server, name, services, bindAddress, conf, scheduler, true);
+ }
+
+ final class FailingConnection extends SimpleServerRpcConnection {
+ private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,
+ long lastContact) {
+ super(rpcServer, channel, lastContact);
+ }
+
+ @Override
+ public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
+ // this will throw exception after the connection header is read, and an RPC is sent
+ // from client
+ throw new DoNotRetryIOException("Failing for test");
+ }
+ }
+
+ @Override
+ protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
+ return new FailingConnection(this, channel, time);
+ }
+ }
+
+ protected RpcServer createTestFailingRpcServer(final String name,
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
- Configuration conf, RpcScheduler scheduler) throws IOException;
+ Configuration conf, RpcScheduler scheduler) throws IOException {
+ if (rpcServerImpl.equals(NettyRpcServer.class)) {
+ return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
+ } else {
+ return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);
+ }
+ }
/** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test
@@ -570,19 +614,33 @@
protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);
+ private IOException doBadPreableHeaderCall(BlockingInterface stub) {
+ ServiceException se = assertThrows(ServiceException.class,
+ () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
+ return ProtobufUtil.handleRemoteException(se);
+ }
+
@Test
- public void testBadPreambleHeader() throws IOException, ServiceException {
+ public void testBadPreambleHeader() throws Exception {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
- ServiceException se = assertThrows(ServiceException.class,
- () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
- IOException ioe = ProtobufUtil.handleRemoteException(se);
- assertThat(ioe, instanceOf(BadAuthException.class));
- assertThat(ioe.getMessage(), containsString("authName=unknown"));
+ BadAuthException error = null;
+ // for SimpleRpcServer, it is possible that we get a broken pipe before getting the
+ // BadAuthException, so we add some retries here, see HBASE-28417
+ for (int i = 0; i < 10; i++) {
+ IOException ioe = doBadPreableHeaderCall(stub);
+ if (ioe instanceof BadAuthException) {
+ error = (BadAuthException) ioe;
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertNotNull("Can not get expected BadAuthException", error);
+ assertThat(error.getMessage(), containsString("authName=unknown"));
} finally {
rpcServer.stop();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
index e60cc87..24177f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
@@ -18,20 +18,20 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestBlockingIPC extends AbstractTestIPC {
@@ -39,11 +39,10 @@
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockingIPC.class);
- @Override
- protected RpcServer createRpcServer(Server server, String name,
- List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
- Configuration conf, RpcScheduler scheduler) throws IOException {
- return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
+ @Parameters(name = "{index}: rpcServerImpl={0}")
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[] { SimpleRpcServer.class },
+ new Object[] { NettyRpcServer.class });
}
@Override
@@ -73,41 +72,6 @@
};
}
- private static class TestFailingRpcServer extends SimpleRpcServer {
-
- TestFailingRpcServer(Server server, String name,
- List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
- Configuration conf, RpcScheduler scheduler) throws IOException {
- super(server, name, services, bindAddress, conf, scheduler, true);
- }
-
- final class FailingConnection extends SimpleServerRpcConnection {
- private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel,
- long lastContact) {
- super(rpcServer, channel, lastContact);
- }
-
- @Override
- public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
- // this will throw exception after the connection header is read, and an RPC is sent
- // from client
- throw new DoNotRetryIOException("Failing for test");
- }
- }
-
- @Override
- protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
- return new FailingConnection(this, channel, time);
- }
- }
-
- @Override
- protected RpcServer createTestFailingRpcServer(String name,
- List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
- Configuration conf, RpcScheduler scheduler) throws IOException {
- return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler);
- }
-
@Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new BlockingRpcClient(conf) {
@@ -124,7 +88,6 @@
}
};
}
-
};
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
index a1b60e2..f2366a2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
@@ -18,13 +18,10 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
@@ -51,18 +48,27 @@
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNettyIPC.class);
- @Parameters(name = "{index}: EventLoop={0}")
- public static Collection<Object[]> parameters() {
- List<Object[]> params = new ArrayList<>();
- params.add(new Object[] { "nio" });
- params.add(new Object[] { "perClientNio" });
+ private static List<String> getEventLoopTypes() {
+ List<String> types = new ArrayList<>();
+ types.add("nio");
+ types.add("perClientNio");
if (JVM.isLinux() && JVM.isAmd64()) {
- params.add(new Object[] { "epoll" });
+ types.add("epoll");
+ }
+ return types;
+ }
+
+ @Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}")
+ public static List<Object[]> parameters() {
+ List<Object[]> params = new ArrayList<>();
+ for (String eventLoopType : getEventLoopTypes()) {
+ params.add(new Object[] { SimpleRpcServer.class, eventLoopType });
+ params.add(new Object[] { NettyRpcServer.class, eventLoopType });
}
return params;
}
- @Parameter
+ @Parameter(1)
public String eventLoopType;
private static NioEventLoopGroup NIO;
@@ -104,13 +110,6 @@
}
@Override
- protected RpcServer createRpcServer(Server server, String name,
- List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
- Configuration conf, RpcScheduler scheduler) throws IOException {
- return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
- }
-
- @Override
protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
setConf(conf);
return new NettyRpcClient(conf) {
@@ -142,13 +141,6 @@
}
@Override
- protected RpcServer createTestFailingRpcServer(String name,
- List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
- Configuration conf, RpcScheduler scheduler) throws IOException {
- return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
- }
-
- @Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new NettyRpcClient(conf) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
index 1cbf6be..00a6b23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
@@ -67,37 +67,41 @@
private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;
- @Parameterized.Parameter(0)
+ @Parameterized.Parameter(1)
public X509KeyType caKeyType;
- @Parameterized.Parameter(1)
+ @Parameterized.Parameter(2)
public X509KeyType certKeyType;
- @Parameterized.Parameter(2)
+ @Parameterized.Parameter(3)
public char[] keyPassword;
- @Parameterized.Parameter(3)
+ @Parameterized.Parameter(4)
public boolean acceptPlainText;
- @Parameterized.Parameter(4)
+ @Parameterized.Parameter(5)
public boolean clientTlsEnabled;
private X509TestContext x509TestContext;
+ // only netty rpc server supports TLS, so here we will only test NettyRpcServer
@Parameterized.Parameters(
- name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
- + " clientTlsEnabled={4}")
+ name = "{index}: rpcServerImpl={0}, caKeyType={1}, certKeyType={2}, keyPassword={3},"
+ + " acceptPlainText={4}, clientTlsEnabled={5}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (char[] keyPassword : new char[][] { "".toCharArray(), "pa$$w0rd".toCharArray() }) {
// do not accept plain text
- params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
+ params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword,
+ false, true });
// support plain text and client enables tls
- params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
+ params.add(
+ new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true, true });
// support plain text and client disables tls
- params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
+ params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true,
+ false });
}
}
}