[REEF-1712] Make JobSubmissionHandler return the Application ID
This change adds `.getApplicationId()` method
to `JobSubmissionHandler` interface and its implementations.
JIRA:
[REEF-1712](https://issues.apache.org/jira/browse/REEF-1712)
Pull request:
Closes #1227
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
index 89a1068..71e44fd 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
@@ -27,6 +27,10 @@
@RuntimeAuthor
public interface JobSubmissionHandler extends EventHandler<JobSubmissionEvent>, AutoCloseable {
- @Override
- void close();
+ /**
+ * Get the RM application ID.
+ * Return null if the application has not been submitted yet, or was submitted unsuccessfully.
+ * @return string application ID or null if no app has been submitted yet.
+ */
+ String getApplicationId();
}
diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
index 7c70a5a..6d61a11 100644
--- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
@@ -57,6 +57,8 @@
private final ClasspathProvider classpath;
private final DriverConfigurationProvider driverConfigurationProvider;
+ private String applicationId;
+
@Inject
HDInsightJobSubmissionHandler(final AzureUploader uploader,
final JobJarMaker jobJarMaker,
@@ -83,16 +85,15 @@
try {
LOG.log(Level.FINE, "Requesting Application ID from HDInsight.");
- final ApplicationID applicationID = this.hdInsightInstance.getApplicationID();
+ final String appId = this.hdInsightInstance.getApplicationID().getApplicationId();
- LOG.log(Level.INFO, "Submitting application {0} to YARN.", applicationID.getApplicationId());
+ LOG.log(Level.INFO, "Submitting application {0} to YARN.", appId);
LOG.log(Level.FINE, "Creating a job folder on Azure.");
- final URI jobFolderURL = this.uploader.createJobFolder(applicationID.getApplicationId());
+ final URI jobFolderURL = this.uploader.createJobFolder(appId);
LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
- final Configuration driverConfiguration =
- makeDriverConfiguration(jobSubmissionEvent, applicationID.getApplicationId(), jobFolderURL);
+ final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, appId, jobFolderURL);
LOG.log(Level.FINE, "Making Job JAR.");
final File jobSubmissionJarFile =
@@ -105,7 +106,7 @@
final String command = getCommandString(jobSubmissionEvent);
final ApplicationSubmission applicationSubmission = new ApplicationSubmission()
- .setApplicationId(applicationID.getApplicationId())
+ .setApplicationId(appId)
.setApplicationName(jobSubmissionEvent.getIdentifier())
.setResource(getResource(jobSubmissionEvent))
.setAmContainerSpec(new AmContainerSpec()
@@ -113,8 +114,8 @@
.setCommand(command));
this.hdInsightInstance.submitApplication(applicationSubmission);
- LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}",
- applicationID.getApplicationId());
+ this.applicationId = appId;
+ LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", appId);
} catch (final IOException ex) {
LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex);
@@ -123,6 +124,16 @@
}
/**
+ * Get the RM application ID.
+ * Return null if the application has not been submitted yet, or was submitted unsuccessfully.
+ * @return string application ID or null if no app has been submitted yet.
+ */
+ @Override
+ public String getApplicationId() {
+ return this.applicationId;
+ }
+
+ /**
* Extracts the resource demands from the jobSubmissionEvent.
*/
private Resource getResource(
@@ -159,12 +170,10 @@
private Configuration makeDriverConfiguration(
final JobSubmissionEvent jobSubmissionEvent,
- final String applicationId,
+ final String appId,
final URI jobFolderURL) throws IOException {
- return this.driverConfigurationProvider.getDriverConfiguration(jobFolderURL,
- jobSubmissionEvent.getRemoteId(),
- applicationId,
- jobSubmissionEvent.getConfiguration());
+ return this.driverConfigurationProvider.getDriverConfiguration(
+ jobFolderURL, jobSubmissionEvent.getRemoteId(), appId, jobSubmissionEvent.getConfiguration());
}
}
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
index be41c94..7a8996e 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
@@ -54,6 +54,8 @@
private final LoggingScopeFactory loggingScopeFactory;
private final DriverConfigurationProvider driverConfigurationProvider;
+ private String applicationId;
+
@Inject
LocalJobSubmissionHandler(
final ExecutorService executor,
@@ -108,10 +110,22 @@
this.configurationSerializer.toFile(driverConfiguration,
new File(driverFolder, this.fileNames.getDriverConfigurationPath()));
this.driverLauncher.launch(driverFolder);
+ this.applicationId = t.getIdentifier();
+
} catch (final Exception e) {
LOG.log(Level.SEVERE, "Unable to setup driver.", e);
throw new RuntimeException("Unable to setup driver.", e);
}
}
}
+
+ /**
+ * Get the RM application ID.
+ * Return null if the application has not been submitted yet, or was submitted unsuccessfully.
+ * @return string application ID or null if no app has been submitted yet.
+ */
+ @Override
+ public String getApplicationId() {
+ return this.applicationId;
+ }
}
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
index f84e082..fe5eb0d 100644
--- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -59,6 +59,8 @@
private final String rootFolderName;
private final DriverConfigurationProvider driverConfigurationProvider;
+ private String applicationId;
+
@Inject
MesosJobSubmissionHandler(@Parameter(RootFolder.class) final String rootFolderName,
final ConfigurationSerializer configurationSerializer,
@@ -138,8 +140,21 @@
.redirectError(errFile)
.redirectOutput(outFile)
.start();
+
+ this.applicationId = jobSubmissionEvent.getIdentifier();
+
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Get the RM application ID.
+ * Return null if the application has not been submitted yet, or was submitted unsuccessfully.
+ * @return string application ID or null if no app has been submitted yet.
+ */
+ @Override
+ public String getApplicationId() {
+ return this.applicationId;
+ }
}
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index ab361bc..9457f90 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -64,6 +64,8 @@
private final SecurityTokenProvider tokenProvider;
private final DriverConfigurationProvider driverConfigurationProvider;
+ private String applicationId;
+
@Inject
YarnJobSubmissionHandler(
@Parameter(JobQueue.class) final String defaultQueueName,
@@ -94,7 +96,9 @@
@Override
public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
- LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionEvent.getIdentifier());
+ final String id = jobSubmissionEvent.getIdentifier();
+ LOG.log(Level.FINEST, "Submitting{0} job: {1}",
+ new Object[] {this.isUnmanaged ? " UNMANAGED AM" : "", jobSubmissionEvent});
try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(
this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider, this.isUnmanaged)) {
@@ -112,7 +116,7 @@
submissionHelper
.addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs)
- .setApplicationName(jobSubmissionEvent.getIdentifier())
+ .setApplicationName(id)
.setDriverMemory(jobSubmissionEvent.getDriverMemory().get())
.setPriority(getPriority(jobSubmissionEvent))
.setQueue(getQueue(jobSubmissionEvent))
@@ -120,13 +124,26 @@
.setMaxApplicationAttempts(getMaxApplicationSubmissions(jobSubmissionEvent))
.submit();
- LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionEvent.getIdentifier());
+ this.applicationId = submissionHelper.getStringApplicationId();
+ LOG.log(Level.FINEST, "Submitted{0} job with ID {1} :: {2}", new String[] {
+ this.isUnmanaged ? " UNMANAGED AM" : "", id, this.applicationId});
+
} catch (final YarnException | IOException e) {
- throw new RuntimeException("Unable to submit Driver to YARN.", e);
+ throw new RuntimeException("Unable to submit Driver to YARN: " + id, e);
}
}
/**
+ * Get the RM application ID.
+ * Return null if the application has not been submitted yet, or was submitted unsuccessfully.
+ * @return string application ID or null if no app has been submitted yet.
+ */
+ @Override
+ public String getApplicationId() {
+ return this.applicationId;
+ }
+
+ /**
* Assembles the Driver configuration.
*/
private Configuration makeDriverConfiguration(