(TWILL-171) Clone the HDFS delegation in HA mode.
- This is for working around HDFS-9276
This closes #42 on Github.
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 81c61ac..445656d 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -84,7 +84,7 @@
final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
ApplicationMasterService service =
- new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient,
+ new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, conf,
createAppLocation(conf, twillRuntimeSpec.getFsUser(),
twillRuntimeSpec.getTwillAppDir()));
TrackerService trackerService = new TrackerService(service);
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 0f647cd..523ffce 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -37,6 +37,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -138,9 +139,10 @@
private Queue<RunnableContainerRequest> runnableContainerRequests;
private ExecutorService instanceChangeExecutor;
- public ApplicationMasterService(RunId runId, ZKClient zkClient, TwillRuntimeSpecification twillRuntimeSpec,
- YarnAMClient amClient, Location applicationLocation) throws Exception {
- super(zkClient, runId, applicationLocation);
+ public ApplicationMasterService(RunId runId, ZKClient zkClient,
+ TwillRuntimeSpecification twillRuntimeSpec, YarnAMClient amClient,
+ Configuration config, Location applicationLocation) throws Exception {
+ super(zkClient, runId, config, applicationLocation);
this.runId = runId;
this.twillRuntimeSpec = twillRuntimeSpec;
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 2baaca1..e6d86a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -124,7 +124,7 @@
ZKClient containerZKClient = getContainerZKClient(zkClientService, appRunId, runnableName);
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
TwillContainerService service = new TwillContainerService(context, containerInfo, containerZKClient,
- runId, runnableSpec, getClassLoader(),
+ runId, runnableSpec, getClassLoader(), conf,
createAppLocation(conf, twillRuntimeSpec.getFsUser(),
twillRuntimeSpec.getTwillAppDir()),
defaultLogLevels, logLevels);
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index 58298a0..6335f9f 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -24,6 +24,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.Command;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillRunnable;
@@ -70,9 +71,9 @@
TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
- Location applicationLocation, Map<String, String> defaultLogLevels,
- Map<String, String> logLevels) {
- super(zkClient, runId, applicationLocation);
+ Configuration config, Location applicationLocation,
+ Map<String, String> defaultLogLevels, Map<String, String> logLevels) {
+ super(zkClient, runId, config, applicationLocation);
this.specification = specification;
this.classLoader = classLoader;
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
index 64f81b4..44cabdc 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
@@ -17,6 +17,7 @@
*/
package org.apache.twill.internal.yarn;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.api.RunId;
@@ -40,11 +41,15 @@
public abstract class AbstractYarnTwillService extends AbstractTwillService {
private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnTwillService.class);
- protected final Location applicationLocation;
+
+ private final Configuration config;
+ private final Location applicationLocation;
protected volatile Credentials credentials;
- protected AbstractYarnTwillService(ZKClient zkClient, RunId runId, Location applicationLocation) {
+ protected AbstractYarnTwillService(ZKClient zkClient, RunId runId,
+ Configuration config, Location applicationLocation) {
super(zkClient, runId);
+ this.config = config;
this.applicationLocation = applicationLocation;
}
@@ -83,11 +88,20 @@
try {
Credentials credentials = new Credentials();
Location location = getSecureStoreLocation();
+
+ // If failed to determine the secure store location, simply ignore the message.
+ if (location == null) {
+ return true;
+ }
+
try (DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()))) {
credentials.readTokenStorageStream(input);
}
UserGroupInformation.getCurrentUser().addCredentials(credentials);
+
+ // Clone the HDFS tokens for HA NameNode. This is to workaround bug HDFS-9276.
+ YarnUtils.cloneHaNnCredentials(config);
this.credentials = credentials;
LOG.info("Secure store updated from {}.", location);
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index 9574554..ff8f4bb 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -21,9 +21,12 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
@@ -48,8 +51,10 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -178,6 +183,34 @@
}
/**
+ * Clones the delegation token to individual host behind the same logical address.
+ *
+ * @param config the hadoop configuration
+ * @throws IOException if failed to get information for the current user.
+ */
+ public static void cloneHaNnCredentials(Configuration config) throws IOException {
+ String scheme = URI.create(config.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT)).getScheme();
+
+ // Loop through all name services. Each name service could have multiple name node associated with it.
+ for (Map.Entry<String, Map<String, InetSocketAddress>> entry : DFSUtil.getHaNnRpcAddresses(config).entrySet()) {
+ String nsId = entry.getKey();
+ Map<String, InetSocketAddress> addressesInNN = entry.getValue();
+ if (!HAUtil.isHAEnabled(config, nsId) || addressesInNN == null || addressesInNN.isEmpty()) {
+ continue;
+ }
+
+ // The client may have a delegation token set for the logical
+ // URI of the cluster. Clone this token to apply to each of the
+ // underlying IPC addresses so that the IPC code can find it.
+ URI uri = URI.create(scheme + "://" + nsId);
+
+ LOG.info("Cloning delegation token for uri {}", uri);
+ HAUtil.cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
+ }
+ }
+
+ /**
* Encodes the given {@link Credentials} as bytes.
*/
public static ByteBuffer encodeCredentials(Credentials credentials) {