[REEF-1727] Add the AM token to the client app credentials in the Unmanaged AM mode
Summary of changes:
* For Unmanaged AM mode, add YARN security token to teh app credentials so that (in-process) AM can use it.
* Bugfix: `YarnSubmissionHelper` must implement `AutoCloseable`, not `java.io.Closeable`.
* `.close()` method does not throw.
* Minor logging improvements.
JIRA: [REEF-1727](https://issues.apache.org/jira/browse/REEF-1727)
This closes #1242
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 278e4e8..6df3ffc 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -19,6 +19,7 @@
package org.apache.reef.runtime.yarn.client;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.*;
@@ -26,13 +27,13 @@
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.reef.runtime.common.REEFLauncher;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
import org.apache.reef.runtime.yarn.util.YarnTypes;
-import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.logging.Level;
@@ -41,7 +42,7 @@
/**
* Helper code that wraps the YARN Client API for our purposes.
*/
-public final class YarnSubmissionHelper implements Closeable {
+public final class YarnSubmissionHelper implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName());
@@ -52,6 +53,7 @@
private final Map<String, LocalResource> resources = new HashMap<>();
private final ClasspathProvider classpath;
private final SecurityTokenProvider tokenProvider;
+ private final boolean isUnmanaged;
private final List<String> commandPrefixList;
private String driverStdoutFilePath;
@@ -67,6 +69,7 @@
final List<String> commandPrefixList) throws IOException, YarnException {
this.classpath = classpath;
+ this.isUnmanaged = isUnmanaged;
this.driverStdoutFilePath =
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStdoutFileName();
@@ -279,6 +282,13 @@
}
this.yarnClient.submitApplication(applicationSubmissionContext);
+
+ if (this.isUnmanaged) {
+ // For Unmanaged AM mode, add a new app token to the
+ // current process so it can talk to the RM as an AM.
+ final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId);
+ this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token));
+ }
}
/**
@@ -304,7 +314,8 @@
}
@Override
- public void close() throws IOException {
- this.yarnClient.stop();
+ public void close() {
+ LOG.log(Level.FINE, "Closing YARN application: {0}", this.applicationId);
+ this.yarnClient.stop(); // same as yarnClient.close()
}
}