HDFS-14013. Skip any credentials stored in HDFS when starting ZKFC. Contributed by Stephen O'Donnell
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
index 16d7bf7..10459404 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
@@ -31,11 +31,14 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
+import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
 import org.apache.hadoop.ha.HealthMonitor.State;
@@ -343,8 +346,19 @@
       zkAcls = Ids.CREATOR_ALL_ACL;
     }
     
-    // Parse authentication from configuration.
-    List<ZKAuthInfo> zkAuths = SecurityUtil.getZKAuthInfos(conf, ZK_AUTH_KEY);
+    // Parse authentication from configuration. Exclude any Credential providers
+    // using the hdfs scheme to avoid a circular dependency. As HDFS is likely
+    // not started when ZKFC is started, we cannot read the credentials from it.
+    Configuration c = conf;
+    try {
+      c = ProviderUtils.excludeIncompatibleCredentialProviders(
+          conf, FileSystem.getFileSystemClass("hdfs", conf));
+    } catch (UnsupportedFileSystemException e) {
+      // Should not happen in a real cluster, as the hdfs FS will always be
+      // present. Inside tests, the hdfs filesystem will not be present
+      LOG.debug("No filesystem found for the hdfs scheme", e);
+    }
+    List<ZKAuthInfo> zkAuths = SecurityUtil.getZKAuthInfos(c, ZK_AUTH_KEY);
 
     // Sanity check configuration.
     Preconditions.checkArgument(zkQuorum != null,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSZKFailoverController.java
index 8f60b1d..0a7a87c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSZKFailoverController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSZKFailoverController.java
@@ -47,6 +47,7 @@
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
@@ -93,14 +94,16 @@
         ServerSocketUtil.getPort(10023, 100));
     conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn2",
         ServerSocketUtil.getPort(10024, 100));
+  }
 
+  private void startCluster() throws Exception {
     // prefer non-ephemeral port to avoid port collision on restartNameNode
     MiniDFSNNTopology topology = new MiniDFSNNTopology()
-    .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
-        .addNN(new MiniDFSNNTopology.NNConf("nn1")
-            .setIpcPort(ServerSocketUtil.getPort(10021, 100)))
-        .addNN(new MiniDFSNNTopology.NNConf("nn2")
-            .setIpcPort(ServerSocketUtil.getPort(10022, 100))));
+        .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
+            .addNN(new MiniDFSNNTopology.NNConf("nn1")
+                .setIpcPort(ServerSocketUtil.getPort(10021, 100)))
+            .addNN(new MiniDFSNNTopology.NNConf("nn2")
+                .setIpcPort(ServerSocketUtil.getPort(10022, 100))));
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(topology)
         .numDataNodes(0)
@@ -113,16 +116,16 @@
 
     thr1.start();
     waitForHAState(0, HAServiceState.ACTIVE);
-    
+
     ctx.addThread(thr2 = new ZKFCThread(ctx, 1));
     thr2.start();
-    
+
     // Wait for the ZKFCs to fully start up
     ZKFCTestUtil.waitForHealthState(thr1.zkfc,
         HealthMonitor.State.SERVICE_HEALTHY, ctx);
     ZKFCTestUtil.waitForHealthState(thr2.zkfc,
         HealthMonitor.State.SERVICE_HEALTHY, ctx);
-    
+
     fs = HATestUtil.configureFailoverFs(cluster, conf);
   }
   
@@ -147,11 +150,26 @@
     }
   }
 
+  @Test(timeout=60000)
+  /**
+   * Ensure the cluster simply starts with a hdfs jceks credential provider
+   * configured. HDFS-14013.
+   */
+  public void testZFFCStartsWithCredentialProviderReferencingHDFS()
+      throws Exception{
+    // Create a provider path on HDFS
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        "jceks://hdfs/tmp/test.jceks");
+    //
+    startCluster();
+  }
+
   /**
    * Test that thread dump is captured after NN state changes.
    */
   @Test(timeout=60000)
   public void testThreadDumpCaptureAfterNNStateChange() throws Exception {
+    startCluster();
     MockNameNodeResourceChecker mockResourceChecker =
         new MockNameNodeResourceChecker(conf);
     mockResourceChecker.setResourcesAvailable(false);
@@ -169,6 +187,7 @@
    */
   @Test(timeout=60000)
   public void testFailoverAndBackOnNNShutdown() throws Exception {
+    startCluster();
     Path p1 = new Path("/dir1");
     Path p2 = new Path("/dir2");
 
@@ -201,6 +220,7 @@
   
   @Test(timeout=30000)
   public void testManualFailover() throws Exception {
+    startCluster();
     thr2.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
     waitForHAState(0, HAServiceState.STANDBY);
     waitForHAState(1, HAServiceState.ACTIVE);
@@ -212,6 +232,7 @@
 
   @Test(timeout=30000)
   public void testWithoutBindAddressSet() throws Exception {
+    startCluster();
     DFSZKFailoverController zkfc = DFSZKFailoverController.create(
         conf);
 
@@ -222,6 +243,7 @@
 
   @Test(timeout=30000)
   public void testWithBindAddressSet() throws Exception {
+    startCluster();
     conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, WILDCARD_ADDRESS);
     DFSZKFailoverController zkfc = DFSZKFailoverController.create(
         conf);
@@ -239,6 +261,7 @@
    */
   @Test
   public void testObserverRejectZkfcCall() throws Exception {
+    startCluster();
     NamenodeProtocols nn1 = cluster.getNameNode(1).getRpcServer();
     nn1.transitionToObserver(
         new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
@@ -251,6 +274,7 @@
 
   @Test(timeout=30000)
   public void testManualFailoverWithDFSHAAdmin() throws Exception {
+    startCluster();
     DFSHAAdmin tool = new DFSHAAdmin();
     tool.setConf(conf);
     assertEquals(0, 
@@ -279,6 +303,7 @@
 
   @Test(timeout=30000)
   public void testElectionOnObserver() throws Exception{
+    startCluster();
     InputStream inOriginial = System.in;
     try {
       DFSHAAdmin tool = new DFSHAAdmin();