(TWILL-171) Clone the HDFS delegation in HA mode.

- This is for working around HDFS-9276

This closes #42 on Github.

Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 81c61ac..445656d 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -84,7 +84,7 @@
 
     final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
     ApplicationMasterService service =
-      new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient,
+      new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, conf,
                                    createAppLocation(conf, twillRuntimeSpec.getFsUser(),
                                                      twillRuntimeSpec.getTwillAppDir()));
     TrackerService trackerService = new TrackerService(service);
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 0f647cd..523ffce 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -37,6 +37,7 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -138,9 +139,10 @@
   private Queue<RunnableContainerRequest> runnableContainerRequests;
   private ExecutorService instanceChangeExecutor;
 
-  public ApplicationMasterService(RunId runId, ZKClient zkClient, TwillRuntimeSpecification twillRuntimeSpec,
-                                  YarnAMClient amClient, Location applicationLocation) throws Exception {
-    super(zkClient, runId, applicationLocation);
+  public ApplicationMasterService(RunId runId, ZKClient zkClient,
+                                  TwillRuntimeSpecification twillRuntimeSpec, YarnAMClient amClient,
+                                  Configuration config, Location applicationLocation) throws Exception {
+    super(zkClient, runId, config, applicationLocation);
 
     this.runId = runId;
     this.twillRuntimeSpec = twillRuntimeSpec;
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 2baaca1..e6d86a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -124,7 +124,7 @@
     ZKClient containerZKClient = getContainerZKClient(zkClientService, appRunId, runnableName);
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
     TwillContainerService service = new TwillContainerService(context, containerInfo, containerZKClient,
-                                                              runId, runnableSpec, getClassLoader(),
+                                                              runId, runnableSpec, getClassLoader(), conf,
                                                               createAppLocation(conf, twillRuntimeSpec.getFsUser(),
                                                                                 twillRuntimeSpec.getTwillAppDir()),
                                                               defaultLogLevels, logLevels);
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index 58298a0..6335f9f 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -24,6 +24,7 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillRunnable;
@@ -70,9 +71,9 @@
 
   TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
                         RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
-                        Location applicationLocation, Map<String, String> defaultLogLevels,
-                        Map<String, String> logLevels) {
-    super(zkClient, runId, applicationLocation);
+                        Configuration config, Location applicationLocation,
+                        Map<String, String> defaultLogLevels, Map<String, String> logLevels) {
+    super(zkClient, runId, config, applicationLocation);
 
     this.specification = specification;
     this.classLoader = classLoader;
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
index 64f81b4..44cabdc 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.internal.yarn;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.twill.api.RunId;
@@ -40,11 +41,15 @@
 public abstract class AbstractYarnTwillService extends AbstractTwillService {
 
   private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnTwillService.class);
-  protected final Location applicationLocation;
+
+  private final Configuration config;
+  private final Location applicationLocation;
   protected volatile Credentials credentials;
 
-  protected AbstractYarnTwillService(ZKClient zkClient, RunId runId, Location applicationLocation) {
+  protected AbstractYarnTwillService(ZKClient zkClient, RunId runId,
+                                     Configuration config, Location applicationLocation) {
     super(zkClient, runId);
+    this.config = config;
     this.applicationLocation = applicationLocation;
   }
 
@@ -83,11 +88,20 @@
     try {
       Credentials credentials = new Credentials();
       Location location = getSecureStoreLocation();
+
+      // If failed to determine the secure store location, simply ignore the message.
+      if (location == null) {
+        return true;
+      }
+
       try (DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()))) {
         credentials.readTokenStorageStream(input);
       }
 
       UserGroupInformation.getCurrentUser().addCredentials(credentials);
+
+      // Clone the HDFS tokens for HA NameNode. This is to workaround bug HDFS-9276.
+      YarnUtils.cloneHaNnCredentials(config);
       this.credentials = credentials;
 
       LOG.info("Secure store updated from {}.", location);
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index 9574554..ff8f4bb 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -21,9 +21,12 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -48,8 +51,10 @@
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -178,6 +183,34 @@
   }
 
   /**
+   * Clones the delegation token to individual host behind the same logical address.
+   *
+   * @param config the hadoop configuration
+   * @throws IOException if failed to get information for the current user.
+   */
+  public static void cloneHaNnCredentials(Configuration config) throws IOException {
+    String scheme = URI.create(config.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+                                          CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT)).getScheme();
+
+    // Loop through all name services. Each name service could have multiple name node associated with it.
+    for (Map.Entry<String, Map<String, InetSocketAddress>> entry : DFSUtil.getHaNnRpcAddresses(config).entrySet()) {
+      String nsId = entry.getKey();
+      Map<String, InetSocketAddress> addressesInNN = entry.getValue();
+      if (!HAUtil.isHAEnabled(config, nsId) || addressesInNN == null || addressesInNN.isEmpty()) {
+        continue;
+      }
+
+      // The client may have a delegation token set for the logical
+      // URI of the cluster. Clone this token to apply to each of the
+      // underlying IPC addresses so that the IPC code can find it.
+      URI uri = URI.create(scheme + "://" + nsId);
+
+      LOG.info("Cloning delegation token for uri {}", uri);
+      HAUtil.cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
+    }
+  }
+
+  /**
    * Encodes the given {@link Credentials} as bytes.
    */
   public static ByteBuffer encodeCredentials(Credentials credentials) {