HADOOP-18452. Fix TestKMS#testKMSHAZooKeeperDelegationToken Failed By Hadoop-18427. (#4885)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index 909f1af..fb9a295 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -59,6 +59,7 @@
 import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -265,7 +266,11 @@
       // So, let's explicitly create them.
       CuratorFramework nullNsFw = zkClient.usingNamespace(null);
       try {
-        nullNsFw.create().creatingParentContainersIfNeeded().forPath("/" + zkClient.getNamespace());
+        String nameSpace = "/" + zkClient.getNamespace();
+        Stat stat = nullNsFw.checkExists().forPath(nameSpace);
+        if (stat == null) {
+          nullNsFw.create().creatingParentContainersIfNeeded().forPath(nameSpace);
+        }
       } catch (Exception e) {
         throw new IOException("Could not create namespace", e);
       }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
index 84899e5..e92a25e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -28,6 +28,8 @@
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +40,8 @@
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -535,5 +539,38 @@
     // Check if the created NameSpace exists.
     Stat stat = curatorFramework.checkExists().forPath(workingPath);
     Assert.assertNotNull(stat);
+
+    tm1.destroy();
+    curatorFramework.close();
+  }
+
+  @Test
+  public void testCreateNameSpaceRepeatedly() throws Exception {
+
+    String connectString = zkServer.getConnectString();
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    Configuration conf = getSecretConf(connectString);
+    CuratorFramework curatorFramework =
+        CuratorFrameworkFactory.builder().
+        connectString(connectString).
+        retryPolicy(retryPolicy).
+        build();
+    curatorFramework.start();
+
+    String workingPath = "/" + conf.get(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH,
+        ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) + "/ZKDTSMRoot-Test";
+    CreateBuilder createBuilder = curatorFramework.create();
+    ProtectACLCreateModeStatPathAndBytesable<String> createModeStat =
+        createBuilder.creatingParentContainersIfNeeded();
+    createModeStat.forPath(workingPath);
+
+    // Check if the created NameSpace exists.
+    Stat stat = curatorFramework.checkExists().forPath(workingPath);
+    Assert.assertNotNull(stat);
+
+    // Repeated creation will throw NodeExists exception
+    LambdaTestUtils.intercept(KeeperException.class,
+        "KeeperErrorCode = NodeExists for "+workingPath,
+        () -> createModeStat.forPath(workingPath));
   }
 }