DRILL-8113: Support building with a JDK 8 target using newer JDKs (#2565)

Also upgrades and mostly enables the tests in TestUserBitKerberos*.java.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoConfig.java
index 4714689..1570ed1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/auth/SpnegoConfig.java
@@ -26,12 +26,12 @@
 
 public class SpnegoConfig {
 
+  // Standard Object Identifier for the SPNEGO GSS-API mechanism.
+  public static final String GSS_SPNEGO_MECH_OID = "1.3.6.1.5.5.2";
+
   private UserGroupInformation loggedInUgi;
-
   private final String principal;
-
   private final String keytab;
-
   // Optional parameter
   private final String clientNameMapping;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index dcd1cf9..7571efb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -117,7 +117,10 @@
     // initialization which causes the tests to fail. So the following two changes are required.
 
     // (1) Refresh Kerberos config.
-    sun.security.krb5.Config.refresh();
+    // This disabled call to an unsupported internal API does not appear to be
+    // required and it prevents compiling with a target of JDK 8 on newer JDKs.
+    // sun.security.krb5.Config.refresh();
+
     // (2) Reset the default realm.
     final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
     defaultRealm.setAccessible(true);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
index a403d6f..755ae06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
@@ -18,19 +18,19 @@
 package org.apache.drill.exec.rpc.user.security;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.typesafe.config.ConfigValueFactory;
 import org.apache.drill.categories.SecurityTest;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.RpcMetrics;
 import org.apache.drill.exec.rpc.control.ControlRpcMetrics;
 import org.apache.drill.exec.rpc.data.DataRpcMetrics;
 import org.apache.drill.exec.rpc.security.KerberosHelper;
 import org.apache.drill.exec.rpc.user.UserRpcMetrics;
 import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
-import org.apache.drill.test.BaseTestQuery;
-import org.apache.hadoop.security.authentication.util.KerberosName;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
 import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -39,175 +39,171 @@
 import org.junit.experimental.categories.Category;
 
 import javax.security.auth.Subject;
-import java.lang.reflect.Field;
 import java.security.PrivilegedExceptionAction;
-import java.util.Properties;
 
-import static junit.framework.TestCase.assertTrue;
+import static junit.framework.TestCase.assertEquals;
 
-@Ignore("See DRILL-5387")
 @Category(SecurityTest.class)
-public class TestUserBitKerberos extends BaseTestQuery {
-  //private static final org.slf4j.Logger logger =org.slf4j.LoggerFactory.getLogger(TestUserBitKerberos.class);
+public class TestUserBitKerberos extends ClusterTest {
 
   private static KerberosHelper krbHelper;
 
   @BeforeClass
   public static void setupTest() throws Exception {
-
     krbHelper = new KerberosHelper(TestUserBitKerberos.class.getSimpleName(), null);
     krbHelper.setupKdc(dirTestWatcher.getTmpDir());
+    cluster = defaultClusterConfig().build();
+  }
 
-    // Create a new DrillConfig which has user authentication enabled and authenticator set to
-    // UserAuthenticatorTestImpl.
-    final DrillConfig newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))));
-
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.USER, "anonymous");
-    connectionProps.setProperty(DrillProperties.PASSWORD, "anything works!");
-
-    // Ignore the compile time warning caused by the code below.
-
-    // Config is statically initialized at this point. But the above configuration results in a different
-    // initialization which causes the tests to fail. So the following two changes are required.
-
-    // (1) Refresh Kerberos config.
-    sun.security.krb5.Config.refresh();
-    // (2) Reset the default realm.
-    final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
-    defaultRealm.setAccessible(true);
-    defaultRealm.set(null, KerberosUtil.getDefaultRealm());
-
-    updateTestCluster(1, newConfig, connectionProps);
+  private static ClusterFixtureBuilder defaultClusterConfig() {
+    return ClusterFixture.bareBuilder(dirTestWatcher)
+      .clusterSize(1)
+      .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+      .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+      .configProperty(ExecConstants.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+      .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, krbHelper.serverKeytab.toString())
+      .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("plain", "kerberos"));
   }
 
   @Test
   public void successKeytab() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-    updateClient(connectionProps);
+    try (
+      ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+      .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+      .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+      .build()
+    ) {
 
-    // Run few queries using the new client
-    testBuilder()
+      // Run few queries using the new client
+      client.testBuilder()
         .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
         .unOrdered()
         .baselineColumns("session_user")
         .baselineValues(krbHelper.CLIENT_SHORT_NAME)
         .go();
-    test("SHOW SCHEMAS");
-    test("USE INFORMATION_SCHEMA");
-    test("SHOW TABLES");
-    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
-    test("SELECT * FROM cp.`region.json` LIMIT 5");
+
+      client.runSqlSilently("SHOW SCHEMAS");
+      client.runSqlSilently("USE INFORMATION_SCHEMA");
+      client.runSqlSilently("SHOW TABLES");
+      client.runSqlSilently("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+      client.runSqlSilently("SELECT * FROM cp.`region.json` LIMIT 5");
+    }
   }
 
   @Test
   public void successTicket() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
-    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
-      krbHelper.clientKeytab.getAbsoluteFile());
+    Subject clientSubject = JaasKrbUtil.loginUsingKeytab(
+      krbHelper.CLIENT_PRINCIPAL,
+      krbHelper.clientKeytab.getAbsoluteFile()
+    );
 
-    Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        updateClient(connectionProps);
-        return null;
-      }
-    });
+    try (
+      ClientFixture client = Subject.doAs(
+        clientSubject,
+        (PrivilegedExceptionAction<ClientFixture>) () -> cluster.clientBuilder()
+          .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+          .property(DrillProperties.KERBEROS_FROM_SUBJECT, "true")
+          .build()
+      )
+    ) {
 
     // Run few queries using the new client
-    testBuilder()
-        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
-        .unOrdered()
-        .baselineColumns("session_user")
-        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
-        .go();
-    test("SHOW SCHEMAS");
-    test("USE INFORMATION_SCHEMA");
-    test("SHOW TABLES");
-    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
-    test("SELECT * FROM cp.`region.json` LIMIT 5");
+    client.testBuilder()
+      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+      .unOrdered()
+      .baselineColumns("session_user")
+      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+      .go();
+
+    client.runSqlSilently("SHOW SCHEMAS");
+    client.runSqlSilently("USE INFORMATION_SCHEMA");
+    client.runSqlSilently("SHOW TABLES");
+    client.runSqlSilently("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+    client.runSqlSilently("SELECT * FROM cp.`region.json` LIMIT 5");
+    }
+   }
+
+  @Test
+  @Ignore("See DRILL-5387. This test works in isolation but not when sharing counters with other tests")
+  public void testUnencryptedConnectionCounter() throws Exception {
+    Subject clientSubject = JaasKrbUtil.loginUsingKeytab(
+      krbHelper.CLIENT_PRINCIPAL,
+      krbHelper.clientKeytab.getAbsoluteFile()
+    );
+
+    try (
+      // Use a dedicated cluster fixture so that the tested RPC counters have a clean start.
+      ClusterFixture cluster = defaultClusterConfig().build();
+      ClientFixture client = Subject.doAs(
+        clientSubject,
+        (PrivilegedExceptionAction<ClientFixture>) () -> cluster.clientBuilder()
+          .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+          .property(DrillProperties.KERBEROS_FROM_SUBJECT, "true")
+          .build()
+      )
+    ) {
+      client.testBuilder()
+          .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+          .unOrdered()
+          .baselineColumns("session_user")
+          .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+          .go();
+
+      RpcMetrics userMetrics = UserRpcMetrics.getInstance(),
+        ctrlMetrics = ControlRpcMetrics.getInstance(),
+        dataMetrics = DataRpcMetrics.getInstance();
+
+      // Check encrypted counters value
+      assertEquals(0, userMetrics.getEncryptedConnectionCount());
+      assertEquals(0, ctrlMetrics.getEncryptedConnectionCount());
+      assertEquals(0, dataMetrics.getEncryptedConnectionCount());
+
+      // Check unencrypted counters value
+      assertEquals(1, userMetrics.getUnEncryptedConnectionCount());
+      assertEquals(0, ctrlMetrics.getUnEncryptedConnectionCount());
+      assertEquals(0, dataMetrics.getUnEncryptedConnectionCount());
+    }
   }
 
   @Test
-  public void testUnecryptedConnectionCounter() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
-    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
-        krbHelper.clientKeytab.getAbsoluteFile());
+  @Ignore("See DRILL-5387. This test works in isolation but not when sharing counters with other tests")
+  public void testUnencryptedConnectionCounter_LocalControlMessage() throws Exception {
+    Subject clientSubject = JaasKrbUtil.loginUsingKeytab(
+      krbHelper.CLIENT_PRINCIPAL,
+      krbHelper.clientKeytab.getAbsoluteFile()
+    );
 
-    Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        updateClient(connectionProps);
-        return null;
-      }
-    });
+    try (
+      // Use a dedicated cluster fixture so that the tested RPC counters have a clean start.
+      ClusterFixture cluster = defaultClusterConfig().build();
+      ClientFixture client = Subject.doAs(
+        clientSubject,
+        (PrivilegedExceptionAction<ClientFixture>) () -> cluster.clientBuilder()
+          .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+          .property(DrillProperties.KERBEROS_FROM_SUBJECT, "true")
+          .build()
+      )
+     ) {
+      // Run query on memory system table this sends remote fragments to all Drillbit and Drillbits then send data
+      // using data channel. In this test we have only 1 Drillbit so there should not be any control connection but a
+      // local data connections
+      client.runSqlSilently("SELECT * FROM sys.memory");
 
-    // Run few queries using the new client
-    testBuilder()
-        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
-        .unOrdered()
-        .baselineColumns("session_user")
-        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
-        .go();
+      RpcMetrics userMetrics = UserRpcMetrics.getInstance(),
+        ctrlMetrics = ControlRpcMetrics.getInstance(),
+        dataMetrics = DataRpcMetrics.getInstance();
 
-    // Check encrypted counters value
-    assertTrue(0 == UserRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(0 == ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(0 == DataRpcMetrics.getInstance().getEncryptedConnectionCount());
+      // Check encrypted counters value
+      assertEquals(0, userMetrics.getEncryptedConnectionCount());
+      assertEquals(0, ctrlMetrics.getEncryptedConnectionCount());
+      assertEquals(0, dataMetrics.getEncryptedConnectionCount());
 
-    // Check unencrypted counters value
-    assertTrue(1 == UserRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(0 == ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(0 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-  }
-
-  @Test
-  public void testUnecryptedConnectionCounter_LocalControlMessage() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
-    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
-      krbHelper.clientKeytab.getAbsoluteFile());
-
-    Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        updateClient(connectionProps);
-        return null;
-      }
-    });
-
-    // Run query on memory system table this sends remote fragments to all Drillbit and Drillbits then send data
-    // using data channel. In this test we have only 1 Drillbit so there should not be any control connection but a
-    // local data connections
-    testSql("SELECT * FROM sys.memory");
-
-    // Check encrypted counters value
-    assertTrue(0 == UserRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(0 == ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(0 == DataRpcMetrics.getInstance().getEncryptedConnectionCount());
-
-    // Check unencrypted counters value
-    assertTrue(1 == UserRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(0 == ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(2 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+      // Check unencrypted counters value
+      assertEquals(1, userMetrics.getUnEncryptedConnectionCount());
+      assertEquals(0, ctrlMetrics.getUnEncryptedConnectionCount());
+      assertEquals(2, dataMetrics.getUnEncryptedConnectionCount());
+    }
   }
 
   @AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
index e609f59..b0449d5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
@@ -18,21 +18,21 @@
 package org.apache.drill.exec.rpc.user.security;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.typesafe.config.ConfigValueFactory;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.categories.SecurityTest;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.rpc.NonTransientRpcException;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcMetrics;
 import org.apache.drill.exec.rpc.control.ControlRpcMetrics;
 import org.apache.drill.exec.rpc.data.DataRpcMetrics;
 import org.apache.drill.exec.rpc.security.KerberosHelper;
 import org.apache.drill.exec.rpc.user.UserRpcMetrics;
 import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
-import org.apache.hadoop.security.authentication.util.KerberosName;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -41,62 +41,34 @@
 import org.junit.experimental.categories.Category;
 
 import javax.security.auth.Subject;
-import java.lang.reflect.Field;
 import java.security.PrivilegedExceptionAction;
-import java.util.Properties;
 
-import static junit.framework.TestCase.assertTrue;
+import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.fail;
 
-@Ignore("See DRILL-5387")
 @Category(SecurityTest.class)
-public class TestUserBitKerberosEncryption extends BaseTestQuery {
+public class TestUserBitKerberosEncryption extends ClusterTest {
   private static final org.slf4j.Logger logger =
       org.slf4j.LoggerFactory.getLogger(TestUserBitKerberosEncryption.class);
 
   private static KerberosHelper krbHelper;
-  private static DrillConfig newConfig;
 
   @BeforeClass
   public static void setupTest() throws Exception {
     krbHelper = new KerberosHelper(TestUserBitKerberosEncryption.class.getSimpleName(), null);
     krbHelper.setupKdc(dirTestWatcher.getTmpDir());
+    cluster = defaultClusterConfig().build();
+  }
 
-    // Create a new DrillConfig which has user authentication enabled and authenticator set to
-    // UserAuthenticatorTestImpl.
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-            ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-            ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-        .withValue(ExecConstants.SERVICE_PRINCIPAL,
-            ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-        .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-            ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-            ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-            ConfigValueFactory.fromAnyRef(true)));
-
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-
-    // Ignore the compile time warning caused by the code below.
-
-    // Config is statically initialized at this point. But the above configuration results in a different
-    // initialization which causes the tests to fail. So the following two changes are required.
-
-    // (1) Refresh Kerberos config.
-    sun.security.krb5.Config.refresh();
-    // (2) Reset the default realm.
-    final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
-    defaultRealm.setAccessible(true);
-    defaultRealm.set(null, KerberosUtil.getDefaultRealm());
-
-    // Start a secure cluster with client using Kerberos related parameters.
-    updateTestCluster(1, newConfig, connectionProps);
+  private static ClusterFixtureBuilder defaultClusterConfig() {
+    return ClusterFixture.bareBuilder(dirTestWatcher)
+      .clusterSize(1)
+      .configProperty(ExecConstants.USER_AUTHENTICATION_ENABLED, true)
+      .configProperty(ExecConstants.USER_AUTHENTICATOR_IMPL, UserAuthenticatorTestImpl.TYPE)
+      .configProperty(ExecConstants.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+      .configProperty(ExecConstants.SERVICE_KEYTAB_LOCATION, krbHelper.serverKeytab.toString())
+      .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("plain", "kerberos"))
+      .configProperty(ExecConstants.USER_ENCRYPTION_SASL_ENABLED, "true");
   }
 
   @AfterClass
@@ -106,39 +78,27 @@
 
   @Test
   public void successKeytabWithoutChunking() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+    try (
+      ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+      .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+      .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+      .build()
+    ) {
+      // Run few queries using the new client
+      client.testBuilder()
+          .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+          .unOrdered()
+          .baselineColumns("session_user")
+          .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+          .go();
 
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true)));
-
-    updateTestCluster(1, newConfig, connectionProps);
-
-    // Run few queries using the new client
-    testBuilder()
-        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
-        .unOrdered()
-        .baselineColumns("session_user")
-        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
-        .go();
-    test("SHOW SCHEMAS");
-    test("USE INFORMATION_SCHEMA");
-    test("SHOW TABLES");
-    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
-    test("SELECT * FROM cp.`region.json`");
+      client.runSqlSilently("SHOW SCHEMAS");
+      client.runSqlSilently("USE INFORMATION_SCHEMA");
+      client.runSqlSilently("SHOW TABLES");
+      client.runSqlSilently("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+      client.runSqlSilently("SELECT * FROM cp.`region.json`");
+    }
   }
 
   /**
@@ -154,171 +114,122 @@
    * @throws Exception
    */
   @Test
+  @Ignore("See DRILL-5387. This test works in isolation but not when sharing counters with other tests")
   public void testConnectionCounters() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true)));
-
-    updateTestCluster(1, newConfig, connectionProps);
-
-    assertTrue(UserRpcMetrics.getInstance().getEncryptedConnectionCount() == 1);
-    assertTrue(UserRpcMetrics.getInstance().getUnEncryptedConnectionCount() == 0);
-
-    // Run few queries using the new client
-    testBuilder()
+    try (
+      // Use a dedicated cluster fixture so that the tested RPC counters have a clean start.
+      ClusterFixture cluster = defaultClusterConfig().build();
+      ClientFixture client = cluster.clientBuilder()
+      .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+      .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+      .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+      .build()
+    ) {
+      client.testBuilder()
         .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
         .unOrdered()
         .baselineColumns("session_user")
         .baselineValues(krbHelper.CLIENT_SHORT_NAME)
         .go();
 
-    // Check encrypted counters value
-    assertTrue(1 == UserRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(0 == ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(0 == DataRpcMetrics.getInstance().getEncryptedConnectionCount());
+      RpcMetrics userMetrics = UserRpcMetrics.getInstance(),
+        ctrlMetrics = ControlRpcMetrics.getInstance(),
+        dataMetrics = DataRpcMetrics.getInstance();
 
-    // Check unencrypted counters value
-    assertTrue(0 == UserRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(0 == ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(0 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+      // Check encrypted counters value, only user-bit encryption is enabled
+      assertEquals(1, userMetrics.getEncryptedConnectionCount());
+      assertEquals(0, ctrlMetrics.getEncryptedConnectionCount());
+      assertEquals(0, dataMetrics.getEncryptedConnectionCount());
+
+      // Check encrypted counters value, only user-bit encryption is enabled
+      assertEquals(0, userMetrics.getUnEncryptedConnectionCount());
+      assertEquals(0, ctrlMetrics.getUnEncryptedConnectionCount());
+      assertEquals(0, dataMetrics.getUnEncryptedConnectionCount());
+    }
   }
 
   @Test
   public void successTicketWithoutChunking() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
-    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
-                                                               krbHelper.clientKeytab.getAbsoluteFile());
+    Subject clientSubject = JaasKrbUtil.loginUsingKeytab(
+      krbHelper.CLIENT_PRINCIPAL,
+      krbHelper.clientKeytab.getAbsoluteFile()
+    );
 
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true)));
+    try (
+      ClientFixture client = Subject.doAs(
+        clientSubject,
+        (PrivilegedExceptionAction<ClientFixture>) () -> cluster.clientBuilder()
+          .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+          .property(DrillProperties.KERBEROS_FROM_SUBJECT, "true")
+          .build()
+      )
+    ) {
+      client.testBuilder()
+          .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+          .unOrdered()
+          .baselineColumns("session_user")
+          .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+          .go();
 
-    Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        updateTestCluster(1, newConfig, connectionProps);
-        return null;
-      }
-    });
+      client.runSqlSilently("SHOW SCHEMAS");
+      client.runSqlSilently("USE INFORMATION_SCHEMA");
+      client.runSqlSilently("SHOW TABLES");
+      client.runSqlSilently("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+      client.runSqlSilently("SELECT * FROM cp.`region.json` LIMIT 5");
+    }
+  }
 
-    // Run few queries using the new client
-    testBuilder()
+  @Test
+  public void successKeytabWithChunking() throws Exception {
+    try (
+      ClusterFixture cluster = defaultClusterConfig()
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE, 100)
+        .build();
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .build()
+    ) {
+      client.testBuilder()
         .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
         .unOrdered()
         .baselineColumns("session_user")
         .baselineValues(krbHelper.CLIENT_SHORT_NAME)
         .go();
-    test("SHOW SCHEMAS");
-    test("USE INFORMATION_SCHEMA");
-    test("SHOW TABLES");
-    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
-    test("SELECT * FROM cp.`region.json` LIMIT 5");
-  }
 
-  @Test
-  public void successKeytabWithChunking() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE,
-        ConfigValueFactory.fromAnyRef(100)));
-
-    updateTestCluster(1, newConfig, connectionProps);
-
-    // Run few queries using the new client
-    testBuilder()
-      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
-      .unOrdered()
-      .baselineColumns("session_user")
-      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
-      .go();
-    test("SHOW SCHEMAS");
-    test("USE INFORMATION_SCHEMA");
-    test("SHOW TABLES");
-    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
-    test("SELECT * FROM cp.`region.json`");
+      client.runSqlSilently("SHOW SCHEMAS");
+      client.runSqlSilently("USE INFORMATION_SCHEMA");
+      client.runSqlSilently("SHOW TABLES");
+      client.runSqlSilently("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+      client.runSqlSilently("SELECT * FROM cp.`region.json` LIMIT 5");
+    }
   }
 
   @Test
   public void successKeytabWithChunkingDefaultChunkSize() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+    try (
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .build()
+    ) {
+      client.testBuilder()
+        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+        .unOrdered()
+        .baselineColumns("session_user")
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+        .go();
 
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true)));
-
-    updateTestCluster(1, newConfig, connectionProps);
-
-    // Run few queries using the new client
-    testBuilder()
-      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
-      .unOrdered()
-      .baselineColumns("session_user")
-      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
-      .go();
-    test("SHOW SCHEMAS");
-    test("USE INFORMATION_SCHEMA");
-    test("SHOW TABLES");
-    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
-    test("SELECT * FROM cp.`region.json` LIMIT 5");
+      client.runSqlSilently("SHOW SCHEMAS");
+      client.runSqlSilently("USE INFORMATION_SCHEMA");
+      client.runSqlSilently("SHOW TABLES");
+      client.runSqlSilently("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+      client.runSqlSilently("SELECT * FROM cp.`region.json` LIMIT 5");
+    }
   }
 
-
   /**
    *  This test will not cover the data channel since we are using only 1 Drillbit and the query doesn't involve
    *  any exchange operator. But Data Channel encryption testing is covered separately in
@@ -326,55 +237,37 @@
    */
   @Test
   public void successEncryptionAllChannelChunkMode() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+    try (
+      ClusterFixture cluster = defaultClusterConfig()
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE, 100)
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE, 10000)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.USE_LOGIN_PRINCIPAL, true)
+        .configProperty(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED, true)
+        .configProperty(ExecConstants.BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE, 10000)
+        .build();
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .build()
+    ) {
+      client.testBuilder()
+        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+        .unOrdered()
+        .baselineColumns("session_user")
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+        .go();
 
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE,
-        ConfigValueFactory.fromAnyRef(10000))
-      .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
-        ConfigValueFactory.fromAnyRef("kerberos"))
-      .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE,
-        ConfigValueFactory.fromAnyRef(10000)));
-
-    updateTestCluster(1, newConfig, connectionProps);
-
-    // Run few queries using the new client
-    testBuilder()
-      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
-      .unOrdered()
-      .baselineColumns("session_user")
-      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
-      .go();
-    test("SHOW SCHEMAS");
-    test("USE INFORMATION_SCHEMA");
-    test("SHOW TABLES");
-    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
-    test("SELECT * FROM cp.`region.json` LIMIT 5");
+        client.runSqlSilently("SHOW SCHEMAS");
+        client.runSqlSilently("USE INFORMATION_SCHEMA");
+        client.runSqlSilently("SHOW TABLES");
+        client.runSqlSilently("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+        client.runSqlSilently("SELECT * FROM cp.`region.json` LIMIT 5");
+    }
   }
 
-
-
   /**
    *  This test will not cover the data channel since we are using only 1 Drillbit and the query doesn't involve
    *  any exchange operator. But Data Channel encryption testing is covered separately in
@@ -382,48 +275,34 @@
    */
   @Test
   public void successEncryptionAllChannel() throws Exception {
+    try (
+      ClusterFixture cluster = defaultClusterConfig()
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE, 100)
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE, 10000)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.USE_LOGIN_PRINCIPAL, true)
+        .configProperty(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED, true)
+        .build();
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .build()
+    ) {
+      client.testBuilder()
+        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+        .unOrdered()
+        .baselineColumns("session_user")
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+        .go();
 
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
-        ConfigValueFactory.fromAnyRef("kerberos"))
-      .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
-        ConfigValueFactory.fromAnyRef(true)));
-
-    updateTestCluster(1, newConfig, connectionProps);
-
-    // Run few queries using the new client
-    testBuilder()
-      .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
-      .unOrdered()
-      .baselineColumns("session_user")
-      .baselineValues(krbHelper.CLIENT_SHORT_NAME)
-      .go();
-    test("SHOW SCHEMAS");
-    test("USE INFORMATION_SCHEMA");
-    test("SHOW TABLES");
-    test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
-    test("SELECT * FROM cp.`region.json` LIMIT 5");
+      client.runSqlSilently("SHOW SCHEMAS");
+      client.runSqlSilently("USE INFORMATION_SCHEMA");
+      client.runSqlSilently("SHOW TABLES");
+      client.runSqlSilently("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
+      client.runSqlSilently("SELECT * FROM cp.`region.json` LIMIT 5");
+    }
   }
 
   /**
@@ -438,77 +317,55 @@
    */
 
   @Test
+  @Ignore("See DRILL-5387. This test works in isolation but not when sharing counters with other tests")
   public void testEncryptedConnectionCountersAllChannel() throws Exception {
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-            ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-            ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-        .withValue(ExecConstants.SERVICE_PRINCIPAL,
-            ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-        .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-            ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-            ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-            ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
-            ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
-            ConfigValueFactory.fromAnyRef("kerberos"))
-        .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
-            ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
-            ConfigValueFactory.fromAnyRef(true)));
-
-    updateTestCluster(1, newConfig, connectionProps);
-
-    // Run few queries using the new client
-    testBuilder()
+    try (
+      // Use a dedicated cluster fixture so that the tested RPC counters have a clean start.
+      ClusterFixture cluster = defaultClusterConfig()
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE, 100)
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE, 10000)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_ENABLED, true)
+        .configProperty(ExecConstants.BIT_AUTHENTICATION_MECHANISM, "kerberos")
+        .configProperty(ExecConstants.USE_LOGIN_PRINCIPAL, true)
+        .configProperty(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED, true)
+        .build();
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .build()
+    ) {
+      client.testBuilder()
         .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
         .unOrdered()
         .baselineColumns("session_user")
         .baselineValues(krbHelper.CLIENT_SHORT_NAME)
         .go();
 
-    // Check encrypted counters value
-    assertTrue(1 == UserRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(0 == ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(0 == DataRpcMetrics.getInstance().getEncryptedConnectionCount());
+      RpcMetrics userMetrics = UserRpcMetrics.getInstance(),
+        ctrlMetrics = ControlRpcMetrics.getInstance(),
+        dataMetrics = DataRpcMetrics.getInstance();
 
-    // Check unencrypted counters value
-    assertTrue(0 == UserRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(0 == ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(0 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+      // Check encrypted counters value
+      assertEquals(1, userMetrics.getEncryptedConnectionCount());
+      assertEquals(0, ctrlMetrics.getEncryptedConnectionCount());
+      assertEquals(0, dataMetrics.getEncryptedConnectionCount());
+
+      // Check unencrypted counters value
+      assertEquals(0, userMetrics.getUnEncryptedConnectionCount());
+      assertEquals(0, ctrlMetrics.getUnEncryptedConnectionCount());
+      assertEquals(0, dataMetrics.getUnEncryptedConnectionCount());
+    }
   }
 
   @Test
   public void failurePlainMech() {
-    try {
-      final Properties connectionProps = new Properties();
-      connectionProps.setProperty(DrillProperties.USER, "anonymous");
-      connectionProps.setProperty(DrillProperties.PASSWORD, "anything works!");
-
-      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-          ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-        .withValue(ExecConstants.SERVICE_PRINCIPAL,
-          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-        .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-          ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-          ConfigValueFactory.fromAnyRef(true)));
-
-      updateTestCluster(1, newConfig, connectionProps);
+    try (
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.USER, "anonymous")
+        .property(DrillProperties.PASSWORD, "anything works!")
+        .build()
+    ) {
       fail();
     } catch (Exception ex) {
       assert (ex.getCause() instanceof NonTransientRpcException);
@@ -518,27 +375,16 @@
 
   @Test
   public void encryptionEnabledWithOnlyPlainMech() {
-    try {
-      final Properties connectionProps = new Properties();
-      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-
-      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-          ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-        .withValue(ExecConstants.SERVICE_PRINCIPAL,
-          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-        .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-          ConfigValueFactory.fromIterable(Lists.newArrayList("plain")))
-        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-          ConfigValueFactory.fromAnyRef(true)));
-      updateTestCluster(1, newConfig, connectionProps);
-
+    try (
+      ClusterFixture cluster = defaultClusterConfig()
+        .configNonStringProperty(ExecConstants.AUTHENTICATION_MECHANISMS, Lists.newArrayList("plain"))
+        .build();
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .build()
+    ) {
       fail();
     } catch (Exception ex) {
       assert (ex.getCause() instanceof NonTransientRpcException);
@@ -552,28 +398,14 @@
    */
   @Test
   public void failureOldClientEncryptionEnabled() {
-    try {
-      final Properties connectionProps = new Properties();
-      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-      connectionProps.setProperty(DrillProperties.TEST_SASL_LEVEL, "1");
-
-      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-          ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-        .withValue(ExecConstants.SERVICE_PRINCIPAL,
-          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-        .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-          ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-          ConfigValueFactory.fromAnyRef(true)));
-      updateTestCluster(1, newConfig, connectionProps);
-
+    try (
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .property(DrillProperties.TEST_SASL_LEVEL, "1")
+        .build()
+    ) {
       fail();
     } catch (Exception ex) {
       assert (ex.getCause() instanceof RpcException);
@@ -587,26 +419,27 @@
    */
   @Test
   public void successOldClientEncryptionDisabled() {
-
-    final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-    connectionProps.setProperty(DrillProperties.TEST_SASL_LEVEL, "1");
-
-    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-        ConfigValueFactory.fromAnyRef(true))
-      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-      .withValue(ExecConstants.SERVICE_PRINCIPAL,
-        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-      .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))));
-
-    updateTestCluster(1, newConfig, connectionProps);
+    try (
+      ClusterFixture cluster = defaultClusterConfig()
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_ENABLED, false)
+        .build();
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .property(DrillProperties.TEST_SASL_LEVEL, "1")
+        .build()
+    ) {
+      client.testBuilder()
+        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+        .unOrdered()
+        .baselineColumns("session_user")
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+        .go();
+    } catch (Exception ex) {
+      fail();
+      assert (ex.getCause() instanceof NonTransientRpcException);
+    }
   }
 
   /**
@@ -614,28 +447,18 @@
    * to server with encryption disabled.
    */
   @Test
-  public void clientNeedsEncryptionWithNoServerSupport() throws Exception {
-    try {
-      final Properties connectionProps = new Properties();
-      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-      connectionProps.setProperty(DrillProperties.SASL_ENCRYPT, "true");
-
-      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-          ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-        .withValue(ExecConstants.SERVICE_PRINCIPAL,
-          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-        .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-          ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))));
-
-      updateTestCluster(1, newConfig, connectionProps);
-
+  public void clientNeedsEncryptionWithNoServerSupport() {
+    try (
+      ClusterFixture cluster = defaultClusterConfig()
+        .configProperty(ExecConstants.USER_ENCRYPTION_SASL_ENABLED, false)
+        .build();
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .property(DrillProperties.SASL_ENCRYPT, "true")
+        .build()
+    ) {
       fail();
     } catch (Exception ex) {
       assert (ex.getCause() instanceof NonTransientRpcException);
@@ -647,32 +470,25 @@
    * to server with encryption enabled.
    */
   @Test
-  public void clientNeedsEncryptionWithServerSupport() throws Exception {
-    try {
-      final Properties connectionProps = new Properties();
-      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
-      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
-      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-      connectionProps.setProperty(DrillProperties.SASL_ENCRYPT, "true");
-
-      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-          ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-          ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-        .withValue(ExecConstants.SERVICE_PRINCIPAL,
-          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
-        .withValue(ExecConstants.SERVICE_KEYTAB_LOCATION,
-          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
-        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-          ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
-        .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
-              ConfigValueFactory.fromAnyRef(true)));
-
-      updateTestCluster(1, newConfig, connectionProps);
+  public void clientNeedsEncryptionWithServerSupport() {
+    try (
+      ClientFixture client = cluster.clientBuilder()
+        .property(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL)
+        .property(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL)
+        .property(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath())
+        .property(DrillProperties.SASL_ENCRYPT, "true")
+        .build()
+    ) {
+      client.testBuilder()
+        .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
+        .unOrdered()
+        .baselineColumns("session_user")
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
+        .go();
     } catch (Exception ex) {
       fail();
       assert (ex.getCause() instanceof NonTransientRpcException);
     }
   }
 }
+
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
index a23776e..914688e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
@@ -29,6 +29,7 @@
 import org.apache.drill.exec.server.rest.WebServerConstants;
 import org.apache.drill.exec.server.rest.auth.DrillSpnegoAuthenticator;
 import org.apache.drill.exec.server.rest.auth.DrillSpnegoLoginService;
+import org.apache.drill.exec.server.rest.auth.SpnegoConfig;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTest;
 import org.apache.hadoop.security.authentication.util.KerberosName;
@@ -50,7 +51,6 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
-import sun.security.jgss.GSSUtil;
 
 import javax.security.auth.Subject;
 import javax.servlet.http.HttpServletRequest;
@@ -67,7 +67,6 @@
 /**
  * Test for validating {@link DrillSpnegoAuthenticator}
  */
-@Ignore("See DRILL-5387")
 @Category(SecurityTest.class)
 public class TestDrillSpnegoAuthenticator extends BaseTest {
 
@@ -84,8 +83,10 @@
     spnegoHelper = new KerberosHelper(TestSpnegoAuthentication.class.getSimpleName(), primaryName);
     spnegoHelper.setupKdc(dirTestWatcher.getTmpDir());
 
-
-    sun.security.krb5.Config.refresh();
+    // (1) Refresh Kerberos config.
+    // This disabled call to an unsupported internal API does not appear to be
+    // required and it prevents compiling with a target of JDK 8 on newer JDKs.
+    // sun.security.krb5.Config.refresh();
 
     // (2) Reset the default realm.
     final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
@@ -203,6 +204,7 @@
    * {@link DrillSpnegoAuthenticator#validateRequest(javax.servlet.ServletRequest, javax.servlet.ServletResponse, boolean)}
    */
   @Test
+  @Ignore("See DRILL-5387")
   public void testAuthClientRequestForLogOut() throws Exception {
     final HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
     final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
@@ -242,7 +244,7 @@
       final GSSManager gssManager = GSSManager.getInstance();
       GSSContext gssContext = null;
       try {
-        final Oid oid = GSSUtil.GSS_SPNEGO_MECH_OID;
+        final Oid oid = new Oid(SpnegoConfig.GSS_SPNEGO_MECH_OID);
         final GSSName serviceName = gssManager.createName(spnegoHelper.SERVER_PRINCIPAL, GSSName.NT_USER_NAME, oid);
 
         gssContext = gssManager.createContext(serviceName, oid, null, GSSContext.DEFAULT_LIFETIME);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoAuthentication.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoAuthentication.java
index 572f32c..fb79f17 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoAuthentication.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoAuthentication.java
@@ -34,6 +34,7 @@
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.server.rest.auth.DrillHttpSecurityHandlerProvider;
 import org.apache.drill.exec.server.rest.auth.DrillSpnegoLoginService;
+import org.apache.drill.exec.server.rest.auth.SpnegoConfig;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTest;
 import org.apache.hadoop.security.authentication.util.KerberosName;
@@ -50,7 +51,6 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
-import sun.security.jgss.GSSUtil;
 
 import javax.security.auth.Subject;
 import java.lang.reflect.Field;
@@ -64,7 +64,6 @@
 /**
  * Test for validating {@link DrillSpnegoLoginService}
  */
-@Ignore("See DRILL-5387")
 @Category(SecurityTest.class)
 public class TestSpnegoAuthentication extends BaseTest {
 
@@ -79,8 +78,10 @@
     spnegoHelper = new KerberosHelper(TestSpnegoAuthentication.class.getSimpleName(), primaryName);
     spnegoHelper.setupKdc(dirTestWatcher.getTmpDir());
 
-
-    sun.security.krb5.Config.refresh();
+    // (1) Refresh Kerberos config.
+    // This disabled call to an unsupported internal API does not appear to be
+    // required and it prevents compiling with a target of JDK 8 on newer JDKs.
+    // sun.security.krb5.Config.refresh();
 
     // (2) Reset the default realm.
     final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
@@ -246,6 +247,7 @@
    * when provided with client token for a configured service principal.
    */
   @Test
+  @Ignore("See DRILL-5387")
   public void testDrillSpnegoLoginService() throws Exception {
 
     // Create client subject using it's principal and keytab
@@ -260,7 +262,7 @@
         final GSSManager gssManager = GSSManager.getInstance();
         GSSContext gssContext = null;
         try {
-          final Oid oid = GSSUtil.GSS_SPNEGO_MECH_OID;
+          final Oid oid = new Oid(SpnegoConfig.GSS_SPNEGO_MECH_OID);
           final GSSName serviceName = gssManager.createName(spnegoHelper.SERVER_PRINCIPAL, GSSName.NT_USER_NAME, oid);
 
           gssContext = gssManager.createContext(serviceName, oid, null, GSSContext.DEFAULT_LIFETIME);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoConfig.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoConfig.java
index 3831f50..a91f802 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoConfig.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestSpnegoConfig.java
@@ -33,7 +33,6 @@
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -46,11 +45,8 @@
 /**
  * Test for validating {@link SpnegoConfig}
  */
-@Ignore("See DRILL-5387")
 @Category(SecurityTest.class)
 public class TestSpnegoConfig extends BaseTest {
-  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSpnegoConfig.class);
-
   private static KerberosHelper spnegoHelper;
 
   private static final String primaryName = "HTTP";
@@ -62,8 +58,10 @@
     spnegoHelper = new KerberosHelper(TestSpnegoAuthentication.class.getSimpleName(), primaryName);
     spnegoHelper.setupKdc(dirTestWatcher.getTmpDir());
 
-
-    sun.security.krb5.Config.refresh();
+    // (1) Refresh Kerberos config.
+    // This disabled call to an unsupported internal API does not appear to be
+    // required and it prevents compiling with a target of JDK 8 on newer JDKs.
+    // sun.security.krb5.Config.refresh();
 
     // (2) Reset the default realm.
     final Field defaultRealm = KerberosName.class.getDeclaredField("defaultRealm");
@@ -163,4 +161,4 @@
       fail();
     }
   }
-}
\ No newline at end of file
+}
diff --git a/pom.xml b/pom.xml
index 10b692b..45e9e6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3988,6 +3988,7 @@
         <jdk>[9,)</jdk>
       </activation>
       <properties>
+        <maven.compiler.release>8</maven.compiler.release>
         <junit.args>
           --add-opens=java.base/java.lang=ALL-UNNAMED
           --add-opens java.base/java.net=ALL-UNNAMED