HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails with broken pipe instead of bad auth (#5740)

Also change the IPC related tests to test different combinations of rpc server&client, for example, NettyRpcClient and SimpleRpcServer

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 1618709..85f7c0a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -290,7 +290,7 @@
     });
   }
 
-  private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) throws IOException {
+  private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) {
     assert eventLoop.inEventLoop();
     PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
       RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index e4427c1..0f0c22b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -56,6 +56,7 @@
 import io.opentelemetry.sdk.trace.data.SpanData;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -73,6 +74,7 @@
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -80,8 +82,10 @@
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.hamcrest.Matcher;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,14 +118,12 @@
   private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
 
   protected static final Configuration CONF = HBaseConfiguration.create();
-  static {
-    // Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
-    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
-  }
 
-  protected abstract RpcServer createRpcServer(Server server, String name,
+  protected RpcServer createRpcServer(Server server, String name,
     List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
-    RpcScheduler scheduler) throws IOException;
+    RpcScheduler scheduler) throws IOException {
+    return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
+  }
 
   private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
     InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
@@ -133,6 +135,14 @@
   @Rule
   public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
 
+  @Parameter(0)
+  public Class<? extends RpcServer> rpcServerImpl;
+
+  @Before
+  public void setUpBeforeTest() {
+    CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
+  }
+
   /**
    * Ensure we do not HAVE TO HAVE a codec.
    */
@@ -348,9 +358,43 @@
     }
   }
 
-  protected abstract RpcServer createTestFailingRpcServer(final String name,
+  private static class FailingSimpleRpcServer extends SimpleRpcServer {
+
+    FailingSimpleRpcServer(Server server, String name,
+      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
+      Configuration conf, RpcScheduler scheduler) throws IOException {
+      super(server, name, services, bindAddress, conf, scheduler, true);
+    }
+
+    final class FailingConnection extends SimpleServerRpcConnection {
+      private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,
+        long lastContact) {
+        super(rpcServer, channel, lastContact);
+      }
+
+      @Override
+      public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
+        // this will throw exception after the connection header is read, and an RPC is sent
+        // from client
+        throw new DoNotRetryIOException("Failing for test");
+      }
+    }
+
+    @Override
+    protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
+      return new FailingConnection(this, channel, time);
+    }
+  }
+
+  protected RpcServer createTestFailingRpcServer(final String name,
     final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException;
+    Configuration conf, RpcScheduler scheduler) throws IOException {
+    if (rpcServerImpl.equals(NettyRpcServer.class)) {
+      return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
+    } else {
+      return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);
+    }
+  }
 
   /** Tests that the connection closing is handled by the client with outstanding RPC calls */
   @Test
@@ -570,19 +614,33 @@
 
   protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);
 
+  private IOException doBadPreableHeaderCall(BlockingInterface stub) {
+    ServiceException se = assertThrows(ServiceException.class,
+      () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
+    return ProtobufUtil.handleRemoteException(se);
+  }
+
   @Test
-  public void testBadPreambleHeader() throws IOException, ServiceException {
+  public void testBadPreambleHeader() throws Exception {
     Configuration clientConf = new Configuration(CONF);
     RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
       new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
     try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
-      ServiceException se = assertThrows(ServiceException.class,
-        () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
-      IOException ioe = ProtobufUtil.handleRemoteException(se);
-      assertThat(ioe, instanceOf(BadAuthException.class));
-      assertThat(ioe.getMessage(), containsString("authName=unknown"));
+      BadAuthException error = null;
+      // for SimpleRpcServer, it is possible that we get a broken pipe before getting the
+      // BadAuthException, so we add some retries here, see HBASE-28417
+      for (int i = 0; i < 10; i++) {
+        IOException ioe = doBadPreableHeaderCall(stub);
+        if (ioe instanceof BadAuthException) {
+          error = (BadAuthException) ioe;
+          break;
+        }
+        Thread.sleep(100);
+      }
+      assertNotNull("Can not get expected BadAuthException", error);
+      assertThat(error.getMessage(), containsString("authName=unknown"));
     } finally {
       rpcServer.stop();
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
index e60cc87..24177f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
@@ -18,20 +18,20 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 @Category({ RPCTests.class, MediumTests.class })
 public class TestBlockingIPC extends AbstractTestIPC {
 
@@ -39,11 +39,10 @@
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestBlockingIPC.class);
 
-  @Override
-  protected RpcServer createRpcServer(Server server, String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
+  @Parameters(name = "{index}: rpcServerImpl={0}")
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[] { SimpleRpcServer.class },
+      new Object[] { NettyRpcServer.class });
   }
 
   @Override
@@ -73,41 +72,6 @@
     };
   }
 
-  private static class TestFailingRpcServer extends SimpleRpcServer {
-
-    TestFailingRpcServer(Server server, String name,
-      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
-      Configuration conf, RpcScheduler scheduler) throws IOException {
-      super(server, name, services, bindAddress, conf, scheduler, true);
-    }
-
-    final class FailingConnection extends SimpleServerRpcConnection {
-      private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel,
-        long lastContact) {
-        super(rpcServer, channel, lastContact);
-      }
-
-      @Override
-      public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
-        // this will throw exception after the connection header is read, and an RPC is sent
-        // from client
-        throw new DoNotRetryIOException("Failing for test");
-      }
-    }
-
-    @Override
-    protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
-      return new FailingConnection(this, channel, time);
-    }
-  }
-
-  @Override
-  protected RpcServer createTestFailingRpcServer(String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler);
-  }
-
   @Override
   protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
     return new BlockingRpcClient(conf) {
@@ -124,7 +88,6 @@
           }
         };
       }
-
     };
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
index a1b60e2..f2366a2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
@@ -18,13 +18,10 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
@@ -51,18 +48,27 @@
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestNettyIPC.class);
 
-  @Parameters(name = "{index}: EventLoop={0}")
-  public static Collection<Object[]> parameters() {
-    List<Object[]> params = new ArrayList<>();
-    params.add(new Object[] { "nio" });
-    params.add(new Object[] { "perClientNio" });
+  private static List<String> getEventLoopTypes() {
+    List<String> types = new ArrayList<>();
+    types.add("nio");
+    types.add("perClientNio");
     if (JVM.isLinux() && JVM.isAmd64()) {
-      params.add(new Object[] { "epoll" });
+      types.add("epoll");
+    }
+    return types;
+  }
+
+  @Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}")
+  public static List<Object[]> parameters() {
+    List<Object[]> params = new ArrayList<>();
+    for (String eventLoopType : getEventLoopTypes()) {
+      params.add(new Object[] { SimpleRpcServer.class, eventLoopType });
+      params.add(new Object[] { NettyRpcServer.class, eventLoopType });
     }
     return params;
   }
 
-  @Parameter
+  @Parameter(1)
   public String eventLoopType;
 
   private static NioEventLoopGroup NIO;
@@ -104,13 +110,6 @@
   }
 
   @Override
-  protected RpcServer createRpcServer(Server server, String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
-  }
-
-  @Override
   protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
     setConf(conf);
     return new NettyRpcClient(conf) {
@@ -142,13 +141,6 @@
   }
 
   @Override
-  protected RpcServer createTestFailingRpcServer(String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
-  }
-
-  @Override
   protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
     return new NettyRpcClient(conf) {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
index 1cbf6be..00a6b23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
@@ -67,37 +67,41 @@
 
   private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;
 
-  @Parameterized.Parameter(0)
+  @Parameterized.Parameter(1)
   public X509KeyType caKeyType;
 
-  @Parameterized.Parameter(1)
+  @Parameterized.Parameter(2)
   public X509KeyType certKeyType;
 
-  @Parameterized.Parameter(2)
+  @Parameterized.Parameter(3)
   public char[] keyPassword;
 
-  @Parameterized.Parameter(3)
+  @Parameterized.Parameter(4)
   public boolean acceptPlainText;
 
-  @Parameterized.Parameter(4)
+  @Parameterized.Parameter(5)
   public boolean clientTlsEnabled;
 
   private X509TestContext x509TestContext;
 
+  // only netty rpc server supports TLS, so here we will only test NettyRpcServer
   @Parameterized.Parameters(
-      name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
-        + " clientTlsEnabled={4}")
+      name = "{index}: rpcServerImpl={0}, caKeyType={1}, certKeyType={2}, keyPassword={3},"
+        + " acceptPlainText={4}, clientTlsEnabled={5}")
   public static List<Object[]> data() {
     List<Object[]> params = new ArrayList<>();
     for (X509KeyType caKeyType : X509KeyType.values()) {
       for (X509KeyType certKeyType : X509KeyType.values()) {
         for (char[] keyPassword : new char[][] { "".toCharArray(), "pa$$w0rd".toCharArray() }) {
           // do not accept plain text
-          params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
+          params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword,
+            false, true });
           // support plain text and client enables tls
-          params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
+          params.add(
+            new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true, true });
           // support plain text and client disables tls
-          params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
+          params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true,
+            false });
         }
       }
     }