[REEF-1731] Implement JobSubmitted client-side event and a corresponding handler
Summary of changes:
* When a job submitted to the resource manager, issue an event
`SubmittedJob`
* Create a default handler of such event, that would just write a new job
ID to the log
* Implement a synchronous version in `DriverLauncher.submit()`
* Minor logging and refactoring in related code
JIRA:
[REEF-1731](https://issues.apache.org/jira/browse/REEF-1731)
Pull request:
This closes #1248
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
index 2e8e197..d100422 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
@@ -39,6 +39,12 @@
public static final OptionalImpl<EventHandler<JobMessage>> ON_JOB_MESSAGE = new OptionalImpl<>();
/**
+ * Handler for the event when a REEF job is submitted to the Resource Manager.
+ * Default implementation just writes the new job ID to the log.
+ */
+ public static final OptionalImpl<EventHandler<SubmittedJob>> ON_JOB_SUBMITTED = new OptionalImpl<>();
+
+ /**
* Handler for the event when a submitted REEF Job is running.
* Default implementation just writes to the log.
*/
@@ -73,6 +79,7 @@
public static final ConfigurationModule CONF = new ClientConfiguration()
.bind(JobMessageHandler.class, ON_JOB_MESSAGE)
+ .bind(JobSubmittedHandler.class, ON_JOB_SUBMITTED)
.bind(JobRunningHandler.class, ON_JOB_RUNNING)
.bind(JobCompletedHandler.class, ON_JOB_COMPLETED)
.bind(JobFailedHandler.class, ON_JOB_FAILED)
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
index 505eeaa..78ca13a 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
@@ -29,6 +29,8 @@
import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,7 +38,7 @@
* A launcher for REEF Drivers.
* <p>
* It can be instantiated using a configuration that can create a REEF instance.
- * For example, the local resourcemanager and the YARN resourcemanager can do this.
+ * For example, the local resource manager and the YARN resource manager can do this.
* <p>
* See {@link org.apache.reef.examples.hello} package for a demo use case.
*/
@@ -44,11 +46,12 @@
@Provided
@ClientSide
@Unit
-public final class DriverLauncher {
+public final class DriverLauncher implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());
private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF
+ .set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class)
.set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
.set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
.set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
@@ -58,7 +61,9 @@
private final REEF reef;
private LauncherStatus status = LauncherStatus.INIT;
- private RunningJob theJob = null;
+
+ private String jobId;
+ private RunningJob theJob;
@Inject
private DriverLauncher(final REEF reef) {
@@ -81,14 +86,21 @@
/**
* Kills the running job.
*/
- public synchronized void close() {
- if (this.status.isRunning()) {
- this.status = LauncherStatus.FORCE_CLOSED;
+ @Override
+ public void close() {
+ synchronized (this) {
+ LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status});
+ if (this.status.isRunning()) {
+ this.status = LauncherStatus.FORCE_CLOSED;
+ }
+ if (null != this.theJob) {
+ this.theJob.close();
+ }
+ this.notify();
}
- if (null != this.theJob) {
- this.theJob.close();
- }
- this.notify();
+ LOG.log(Level.FINEST, "Close launcher: shutdown REEF");
+ this.reef.close();
+ LOG.log(Level.FINEST, "Close launcher: done");
}
/**
@@ -110,68 +122,114 @@
}
}
this.reef.close();
+ return this.getStatus();
+ }
+
+ /**
+ * Submit REEF job asynchronously and do not wait for its completion.
+ *
+ * @param driverConfig configuration of hte driver to submit to the RM.
+ * @return ID of the new application.
+ */
+ public String submit(final Configuration driverConfig, final long waitTime) {
+ this.reef.submit(driverConfig);
+ this.waitForStatus(waitTime, LauncherStatus.SUBMITTED);
+ return this.jobId;
+ }
+
+ /**
+ * Wait for one of the specified statuses of the REEF job.
+ * This method is called after the job is submitted to the RM via submit().
+ * @param waitTime wait time in milliseconds.
+ * @param statuses array of statuses to wait for.
+ * @return the state of the job after the wait.
+ */
+ public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) {
+
+ final long endTime = System.currentTimeMillis() + waitTime;
+
+ final HashSet<LauncherStatus> statSet = new HashSet<>(statuses.length * 2);
+ Collections.addAll(statSet, statuses);
+ Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED);
+
+ LOG.log(Level.FINEST, "Wait for status: {0}", statSet);
+ final LauncherStatus finalStatus;
+
synchronized (this) {
- return this.status;
+ while (!statSet.contains(this.status)) {
+ try {
+ final long delay = endTime - System.currentTimeMillis();
+ if (delay <= 0) {
+ break;
+ }
+ LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay);
+ this.wait(delay);
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.FINE, "Interrupted: {0}", ex);
+ }
+ }
+
+ finalStatus = this.status;
}
+
+ LOG.log(Level.FINEST, "Final status: {0}", finalStatus);
+ return finalStatus;
}
/**
* Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
*
* @param driverConfig the configuration for the driver. See DriverConfiguration for details.
- * @param timeOut timeout on the job.
+ * @param timeOut timeout on the job.
* @return the state of the job after execution.
*/
public LauncherStatus run(final Configuration driverConfig, final long timeOut) {
- final long endTime = System.currentTimeMillis() + timeOut;
+
+ final long startTime = System.currentTimeMillis();
+
this.reef.submit(driverConfig);
- synchronized (this) {
- while (!this.status.isDone()) {
- try {
- final long waitTime = endTime - System.currentTimeMillis();
- if (waitTime <= 0) {
- break;
- }
- LOG.log(Level.FINE, "Wait for {0} milliSeconds", waitTime);
- this.wait(waitTime);
- } catch (final InterruptedException ex) {
- LOG.log(Level.FINE, "Interrupted: {0}", ex);
- }
- }
- if (System.currentTimeMillis() >= endTime) {
- LOG.log(Level.WARNING, "The Job timed out.");
+ this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED);
+
+ if (System.currentTimeMillis() - startTime >= timeOut) {
+ LOG.log(Level.WARNING, "The Job timed out.");
+ synchronized (this) {
this.status = LauncherStatus.FORCE_CLOSED;
}
}
this.reef.close();
- synchronized (this) {
- return this.status;
- }
+ return this.getStatus();
}
/**
* @return the current status of the job.
*/
- public LauncherStatus getStatus() {
- synchronized (this) {
- return this.status;
- }
+ public synchronized LauncherStatus getStatus() {
+ return this.status;
}
- /**
- * Update job status and notify the waiting thread.
- */
- @SuppressWarnings("checkstyle:hiddenfield")
- public synchronized void setStatusAndNotify(final LauncherStatus status) {
- LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[]{this.status, status});
- this.status = status;
+ /** Update job status and notify the waiting thread. */
+ public synchronized void setStatusAndNotify(final LauncherStatus newStatus) {
+ LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus});
+ this.status = newStatus;
this.notify();
}
@Override
public String toString() {
- return this.status.toString();
+ return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status);
+ }
+
+ /**
+ * Job driver notifies us that the job has been submitted to the Resource Manager.
+ */
+ public final class SubmittedJobHandler implements EventHandler<SubmittedJob> {
+ @Override
+ public void onNext(final SubmittedJob job) {
+ LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId());
+ jobId = job.getId();
+ setStatusAndNotify(LauncherStatus.SUBMITTED);
+ }
}
/**
@@ -217,7 +275,7 @@
public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
@Override
public void onNext(final FailedRuntime error) {
- LOG.log(Level.SEVERE, "Received a resourcemanager error", error.getReason());
+ LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason());
theJob = null;
setStatusAndNotify(LauncherStatus.failed(error.getReason()));
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
index 434d22b..9d6965d 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
@@ -26,10 +26,12 @@
public final class LauncherStatus {
public static final LauncherStatus INIT = new LauncherStatus(State.INIT);
+ public static final LauncherStatus SUBMITTED = new LauncherStatus(State.SUBMITTED);
public static final LauncherStatus RUNNING = new LauncherStatus(State.RUNNING);
public static final LauncherStatus COMPLETED = new LauncherStatus(State.COMPLETED);
public static final LauncherStatus FORCE_CLOSED = new LauncherStatus(State.FORCE_CLOSED);
public static final LauncherStatus FAILED = new LauncherStatus(State.FAILED);
+
private final State state;
private final Optional<Throwable> error;
@@ -37,7 +39,6 @@
this(state, null);
}
-
private LauncherStatus(final State state, final Throwable ex) {
this.state = state;
this.error = Optional.ofNullable(ex);
@@ -120,6 +121,7 @@
*/
private enum State {
INIT,
+ SUBMITTED,
RUNNING,
COMPLETED,
FAILED,
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/SubmittedJob.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/SubmittedJob.java
new file mode 100644
index 0000000..3b88173
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/SubmittedJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.naming.Identifiable;
+
+/**
+ * Represents a completed REEF job.
+ */
+@Public
+@ClientSide
+@Provided
+public interface SubmittedJob extends Identifiable {
+
+ /** @return ID of the submitted job. */
+ @Override
+ String getId();
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobSubmittedHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobSubmittedHandler.java
new file mode 100644
index 0000000..20891ff
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobSubmittedHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client.parameters;
+
+import org.apache.reef.client.SubmittedJob;
+import org.apache.reef.runtime.common.client.defaults.DefaultSubmittedJobHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/** Handler for the SubmittedJob event. */
+@NamedParameter(doc = "Event handler for SubmittedJob", default_classes = DefaultSubmittedJobHandler.class)
+public final class JobSubmittedHandler implements Name<EventHandler<SubmittedJob>> {
+
+ private JobSubmittedHandler() { }
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
index ac52922..1b56aec 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
@@ -22,20 +22,20 @@
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.REEF;
+import org.apache.reef.client.SubmittedJob;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.client.parameters.JobSubmittedHandler;
import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.ConfigurationBuilder;
-import org.apache.reef.tang.ConfigurationProvider;
-import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.*;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.REEFVersion;
import org.apache.reef.util.logging.LoggingScope;
import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.util.Set;
@@ -53,38 +53,43 @@
private static final Logger LOG = Logger.getLogger(REEFImplementation.class.getName());
private final JobSubmissionHandler jobSubmissionHandler;
- private final RunningJobs runningJobs;
private final JobSubmissionHelper jobSubmissionHelper;
+ private final InjectionFuture<EventHandler<SubmittedJob>> jobSubmittedHandler;
+ private final RunningJobs runningJobs;
private final ClientWireUp clientWireUp;
private final LoggingScopeFactory loggingScopeFactory;
private final Set<ConfigurationProvider> configurationProviders;
/**
* @param jobSubmissionHandler
- * @param runningJobs
* @param jobSubmissionHelper
* @param jobStatusMessageHandler is passed only to make sure it is instantiated
+ * @param runningJobs
* @param clientWireUp
- * @param reefVersion provides the current version of REEF.
+ * @param reefVersion provides the current version of REEF.
* @param configurationProviders
*/
@Inject
- REEFImplementation(final JobSubmissionHandler jobSubmissionHandler,
- final RunningJobs runningJobs,
- final JobSubmissionHelper jobSubmissionHelper,
- final JobStatusMessageHandler jobStatusMessageHandler,
- final ClientWireUp clientWireUp,
- final LoggingScopeFactory loggingScopeFactory,
- final REEFVersion reefVersion,
- @Parameter(DriverConfigurationProviders.class)
- final Set<ConfigurationProvider> configurationProviders) {
+ private REEFImplementation(
+ final JobSubmissionHandler jobSubmissionHandler,
+ final JobSubmissionHelper jobSubmissionHelper,
+ final JobStatusMessageHandler jobStatusMessageHandler,
+ final RunningJobs runningJobs,
+ final ClientWireUp clientWireUp,
+ final LoggingScopeFactory loggingScopeFactory,
+ final REEFVersion reefVersion,
+ @Parameter(JobSubmittedHandler.class) final InjectionFuture<EventHandler<SubmittedJob>> jobSubmittedHandler,
+ @Parameter(DriverConfigurationProviders.class) final Set<ConfigurationProvider> configurationProviders) {
+
this.jobSubmissionHandler = jobSubmissionHandler;
- this.runningJobs = runningJobs;
+ this.jobSubmittedHandler = jobSubmittedHandler;
this.jobSubmissionHelper = jobSubmissionHelper;
+ this.runningJobs = runningJobs;
this.clientWireUp = clientWireUp;
this.configurationProviders = configurationProviders;
- clientWireUp.performWireUp();
this.loggingScopeFactory = loggingScopeFactory;
+
+ clientWireUp.performWireUp();
reefVersion.logVersion();
}
@@ -127,6 +132,9 @@
}
this.jobSubmissionHandler.onNext(submissionMessage);
+
+ this.jobSubmittedHandler.get().onNext(
+ new SubmittedJobImpl(this.jobSubmissionHandler.getApplicationId()));
}
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/SubmittedJobImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/SubmittedJobImpl.java
new file mode 100644
index 0000000..c248262
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/SubmittedJobImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.SubmittedJob;
+
+/** An implementation of the SubmittedJob interface. */
+@ClientSide
+@Private
+final class SubmittedJobImpl implements SubmittedJob {
+
+ private final String jobId;
+
+ SubmittedJobImpl(final String jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public String getId() {
+ return this.jobId;
+ }
+
+ @Override
+ public String toString() {
+ return "SubmittedJob: { id:" + this.jobId + " }";
+ }
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultSubmittedJobHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultSubmittedJobHandler.java
new file mode 100644
index 0000000..c90a82b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultSubmittedJobHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.SubmittedJob;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/** Default event handler for SubmittedJob: Log the new job ID. */
+@Provided
+@ClientSide
+public final class DefaultSubmittedJobHandler implements EventHandler<SubmittedJob> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultSubmittedJobHandler.class.getName());
+
+ @Inject
+ private DefaultSubmittedJobHandler() { }
+
+ @Override
+ public void onNext(final SubmittedJob job) {
+ LOG.log(Level.INFO, "Job Submitted: {0}", job);
+ }
+}