[REEF-1596] Add the ability to check the status of the REEF job launched directly via REEFEnvironment Driver.
Summary of changes:
* Add .getLastStatus() method to JobStatusHandler interface
* Implement .getLastStatus() method in all classes that derive from JobStatusHandler
* Set the default value for JobStatusHandler injectable parameter
* implement REEFEnvironment.getLastStatus() method
* Use that method in unit tests
* Minor cosmetic changes in the code, more javadocs
JIRA:
[REEF-1596](https://issues.apache.org/jira/browse/REEF-1596)
Pull Request:
This changes #1151
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
index c8a3d0e..b997347 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
@@ -18,6 +18,8 @@
*/
package org.apache.reef.runtime.common;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.client.JobStatusHandler;
import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
import org.apache.reef.runtime.common.launch.REEFErrorHandler;
import org.apache.reef.tang.Configuration;
@@ -59,6 +61,8 @@
/** Error handler that processes all uncaught REEF exceptions. */
private final REEFErrorHandler errorHandler;
+ private final JobStatusHandler jobStatusHandler;
+
/**
* Create a new REEF environment.
* @param configurations REEF component (Driver or Evaluator) configuration.
@@ -86,11 +90,12 @@
injector.getInstance(REEFVersion.class).logVersion();
final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class);
+ final JobStatusHandler jobStatusHandler = injector.getInstance(JobStatusHandler.class);
try {
final Clock clock = injector.getInstance(Clock.class);
- return new REEFEnvironment(clock, errorHandler);
+ return new REEFEnvironment(clock, errorHandler, jobStatusHandler);
} catch (final Throwable ex) {
LOG.log(Level.SEVERE, "Error while instantiating the clock", ex);
@@ -107,10 +112,15 @@
* Use .fromConfiguration() method to create new REEF environment.
* @param clock main event loop.
* @param errorHandler error handler.
+ * @param jobStatusHandler an object that receives notifications on job status changes
+ * and can be queried for the last received job status.
*/
- private REEFEnvironment(final Clock clock, final REEFErrorHandler errorHandler) {
+ private REEFEnvironment(
+ final Clock clock, final REEFErrorHandler errorHandler, final JobStatusHandler jobStatusHandler) {
+
this.clock = clock;
this.errorHandler = errorHandler;
+ this.jobStatusHandler = jobStatusHandler;
}
/**
@@ -146,6 +156,7 @@
/**
* Launch REEF component (Driver or Evaluator).
* It is usually called from the static .run() method.
+ * Check the status of the run via .getLastStatus() method.
*/
@Override
@SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
@@ -159,12 +170,19 @@
LOG.log(Level.FINEST, "Clock: start");
this.clock.run();
- LOG.log(Level.FINEST, "Clock: exit normally");
+ LOG.log(Level.FINEST, "Clock: exit normally: {0}", this.getLastStatus());
} catch (final Throwable ex) {
LOG.log(Level.SEVERE, "Clock: Error in main event loop", ex);
this.errorHandler.onNext(ex);
- throw ex;
}
}
+
+ /**
+ * Get the last known status of REEF job. Can return null if job has not started yet.
+ * @return Status of the REEF job launched in this environment.
+ */
+ public ReefServiceProtos.JobStatusProto getLastStatus() {
+ return this.jobStatusHandler.getLastStatus();
+ }
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
index 3f22e84..7be8e07 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
@@ -21,11 +21,8 @@
import com.google.protobuf.ByteString;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
-import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.util.logging.Level;
@@ -39,22 +36,16 @@
private static final Logger LOG = Logger.getLogger(ClientConnection.class.getName());
- private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler;
+ private final JobStatusHandler jobStatusHandler;
private final String jobIdentifier;
@Inject
public ClientConnection(
- final RemoteManager remoteManager,
- @Parameter(ClientRemoteIdentifier.class) final String clientRID,
- @Parameter(JobIdentifier.class) final String jobIdentifier) {
+ @Parameter(JobIdentifier.class) final String jobIdentifier,
+ final JobStatusHandler jobStatusHandler) {
+
this.jobIdentifier = jobIdentifier;
- if (clientRID.equals(ClientRemoteIdentifier.NONE)) {
- LOG.log(Level.FINE, "Instantiated 'ClientConnection' without an actual connection to the client.");
- this.jobStatusHandler = new LoggingJobStatusHandler();
- } else {
- this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class);
- LOG.log(Level.FINE, "Instantiated 'ClientConnection'");
- }
+ this.jobStatusHandler = jobStatusHandler;
}
/**
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java
new file mode 100644
index 0000000..1cc2034
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Generic interface for job status messages' handler.
+ * Receive JobStatusProto messages and keep the last message so it can be retrieved via getLastStatus() method.
+ */
+@DriverSide
+@DefaultImplementation(RemoteClientJobStatusHandler.class)
+public interface JobStatusHandler extends EventHandler<ReefServiceProtos.JobStatusProto> {
+
+ /**
+ * Return the last known status of the REEF job.
+ * Can return null if the job has not been launched yet.
+ * @return Last status of the REEF job.
+ */
+ ReefServiceProtos.JobStatusProto getLastStatus();
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
index f627c2b..0bf7ce5 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
@@ -18,8 +18,8 @@
*/
package org.apache.reef.runtime.common.driver.client;
+import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.util.logging.Level;
@@ -28,15 +28,30 @@
/**
* A handler for job status messages that just logs them.
*/
-final class LoggingJobStatusHandler implements EventHandler<ReefServiceProtos.JobStatusProto> {
+@DriverSide
+public class LoggingJobStatusHandler implements JobStatusHandler {
+
private static final Logger LOG = Logger.getLogger(LoggingJobStatusHandler.class.getName());
+ private ReefServiceProtos.JobStatusProto lastStatus = null;
+
@Inject
LoggingJobStatusHandler() {
}
@Override
public void onNext(final ReefServiceProtos.JobStatusProto jobStatusProto) {
- LOG.log(Level.INFO, "Received a JobStatus message that can't be sent:\n" + jobStatusProto.toString());
+ this.lastStatus = jobStatusProto;
+ LOG.log(Level.INFO, "In-process JobStatus:\n{0}", jobStatusProto);
+ }
+
+ /**
+ * Return the last known status of the REEF job.
+ * Can return null if the job has not been launched yet.
+ * @return Last status of the REEF job.
+ */
+ @Override
+ public ReefServiceProtos.JobStatusProto getLastStatus() {
+ return this.lastStatus;
}
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java
new file mode 100644
index 0000000..001fca3
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Generic interface for job status messages' handler.
+ */
+@DriverSide
+final class RemoteClientJobStatusHandler implements JobStatusHandler {
+
+ private static final Logger LOG = Logger.getLogger(RemoteClientJobStatusHandler.class.getName());
+
+ private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler;
+
+ private ReefServiceProtos.JobStatusProto lastStatus = null;
+
+ @Inject
+ private RemoteClientJobStatusHandler(
+ final RemoteManager remoteManager,
+ @Parameter(ClientRemoteIdentifier.class) final String clientRID) {
+
+ if (clientRID.equals(ClientRemoteIdentifier.NONE)) {
+ LOG.log(Level.FINE, "Instantiated 'RemoteClientJobStatusHandler' without an actual connection to the client.");
+ this.jobStatusHandler = new LoggingJobStatusHandler();
+ } else {
+ this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class);
+ LOG.log(Level.FINE, "Instantiated 'RemoteClientJobStatusHandler' for {0}", clientRID);
+ }
+ }
+
+ @Override
+ public void onNext(final ReefServiceProtos.JobStatusProto jobStatus) {
+ this.lastStatus = jobStatus;
+ this.jobStatusHandler.onNext(jobStatus);
+ }
+
+ /**
+ * Return the last known status of the REEF job.
+ * Can return null if the job has not been launched yet.
+ * @return Last status of the REEF job.
+ */
+ @Override
+ public ReefServiceProtos.JobStatusProto getLastStatus() {
+ return this.lastStatus;
+ }
+}
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
index 658b107..2b7ba09 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.reef.runtime.local.driver;
import org.apache.reef.runtime.common.driver.api.*;
+import org.apache.reef.runtime.common.driver.client.JobStatusHandler;
import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes;
import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
@@ -30,10 +31,7 @@
import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
import org.apache.reef.runtime.local.client.parameters.RackNames;
import org.apache.reef.runtime.local.client.parameters.RootFolder;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.OptionalParameter;
-import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.tang.formats.*;
/**
* ConfigurationModule for the Driver executed in the local resourcemanager. This is meant to eventually replace
@@ -68,6 +66,11 @@
public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
/**
+ * Interface to use for communications back to the client.
+ */
+ public static final OptionalImpl<JobStatusHandler> JOB_STATUS_HANDLER = new OptionalImpl<>();
+
+ /**
* The identifier of the Job submitted.
*/
public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
@@ -78,6 +81,7 @@
.bindImplementation(ResourceReleaseHandler.class, LocalResourceReleaseHandler.class)
.bindImplementation(ResourceManagerStartHandler.class, LocalResourceManagerStartHandler.class)
.bindImplementation(ResourceManagerStopHandler.class, LocalResourceManagerStopHandler.class)
+ .bindImplementation(JobStatusHandler.class, JOB_STATUS_HANDLER)
.bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
.bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER)
.bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER)
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
index 9d64500..3dd7180 100644
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
@@ -22,6 +22,7 @@
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.REEFEnvironment;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
@@ -81,18 +82,18 @@
* @param failMsgClass A class that should fail during the test.
* @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN.
* @param timeOut REEF application timeout - not used yet.
- * @return launcher status - usually FAIL.
+ * @return Final job status. Final status for tests is usually something
+ * with state = FAILED and exception like SimulatedDriverFailure.
* @throws InjectionException configuration error.
*/
- public static LauncherStatus runInProcess(final Class<?> failMsgClass,
+ public static ReefServiceProtos.JobStatusProto runInProcess(final Class<?> failMsgClass,
final Configuration runtimeConfig, final int timeOut) throws InjectionException {
try (final REEFEnvironment reef =
REEFEnvironment.fromConfiguration(runtimeConfig, buildDriverConfig(failMsgClass))) {
reef.run();
+ return reef.getLastStatus();
}
-
- return LauncherStatus.FORCE_CLOSED; // TODO[REEF-1596]: Use the actual status, when implemented.
}
/**
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
index 088aee3..9082a36 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
@@ -19,6 +19,7 @@
package org.apache.reef.tests.driver;
import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.REEFEnvironment;
import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
@@ -30,13 +31,13 @@
import org.junit.Test;
/**
- * This tests whether the noop driver gets shutdown properly.
+ * This tests whether the noop driver launched in-process gets shutdown properly.
*/
public final class REEFEnvironmentDriverTest {
private static final Configuration DRIVER_CONFIG = DriverConfiguration.CONF
.set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(DriverTestStartHandler.class))
- .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_DriverTest")
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_REEFEnvironmentDriverTest")
.set(DriverConfiguration.ON_DRIVER_STARTED, DriverTestStartHandler.class)
.build();
@@ -45,14 +46,22 @@
.set(LocalDriverConfiguration.ROOT_FOLDER, ".")
.set(LocalDriverConfiguration.JVM_HEAP_SLACK, 0.0)
.set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
- .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_DRIVER_TEST")
+ .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_ENV_DRIVER_TEST")
.set(LocalDriverConfiguration.RUNTIME_NAMES, org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME)
.build();
@Test
public void testREEFEnvironmentDriver() throws BindException, InjectionException {
+
try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(LOCAL_DRIVER_MODULE, DRIVER_CONFIG)) {
+
reef.run();
+ final ReefServiceProtos.JobStatusProto status = reef.getLastStatus();
+
+ Assert.assertNotNull("REEF job must report its status", status);
+ Assert.assertTrue("REEF job status must contain a state", status.hasState());
+ Assert.assertEquals("Unexpected final job status", ReefServiceProtos.State.DONE, status.getState());
+
} catch (final Throwable ex) {
Assert.fail("Local driver execution failed: " + ex);
}