[REEF-1596] Add the ability to check the status of the REEF job launched directly via REEFEnvironment Driver.

Summary of changes:
   * Add .getLastStatus() method to JobStatusHandler interface
   * Implement .getLastStatus() method in all classes that derive from JobStatusHandler
   * Set the default value for JobStatusHandler injectable parameter
   * implement REEFEnvironment.getLastStatus() method
   * Use that method in unit tests
   * Minor cosmetic changes in the code, more javadocs

JIRA:
  [REEF-1596](https://issues.apache.org/jira/browse/REEF-1596)

Pull Request:
  This changes #1151
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
index c8a3d0e..b997347 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
@@ -18,6 +18,8 @@
  */
 package org.apache.reef.runtime.common;
 
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.client.JobStatusHandler;
 import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
 import org.apache.reef.runtime.common.launch.REEFErrorHandler;
 import org.apache.reef.tang.Configuration;
@@ -59,6 +61,8 @@
   /** Error handler that processes all uncaught REEF exceptions. */
   private final REEFErrorHandler errorHandler;
 
+  private final JobStatusHandler jobStatusHandler;
+
   /**
    * Create a new REEF environment.
    * @param configurations REEF component (Driver or Evaluator) configuration.
@@ -86,11 +90,12 @@
     injector.getInstance(REEFVersion.class).logVersion();
 
     final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class);
+    final JobStatusHandler jobStatusHandler = injector.getInstance(JobStatusHandler.class);
 
     try {
 
       final Clock clock = injector.getInstance(Clock.class);
-      return new REEFEnvironment(clock, errorHandler);
+      return new REEFEnvironment(clock, errorHandler, jobStatusHandler);
 
     } catch (final Throwable ex) {
       LOG.log(Level.SEVERE, "Error while instantiating the clock", ex);
@@ -107,10 +112,15 @@
    * Use .fromConfiguration() method to create new REEF environment.
    * @param clock main event loop.
    * @param errorHandler error handler.
+   * @param jobStatusHandler an object that receives notifications on job status changes
+   * and can be queried for the last received job status.
    */
-  private REEFEnvironment(final Clock clock, final REEFErrorHandler errorHandler) {
+  private REEFEnvironment(
+      final Clock clock, final REEFErrorHandler errorHandler, final JobStatusHandler jobStatusHandler) {
+
     this.clock = clock;
     this.errorHandler = errorHandler;
+    this.jobStatusHandler = jobStatusHandler;
   }
 
   /**
@@ -146,6 +156,7 @@
   /**
    * Launch REEF component (Driver or Evaluator).
    * It is usually called from the static .run() method.
+   * Check the status of the run via .getLastStatus() method.
    */
   @Override
   @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
@@ -159,12 +170,19 @@
 
       LOG.log(Level.FINEST, "Clock: start");
       this.clock.run();
-      LOG.log(Level.FINEST, "Clock: exit normally");
+      LOG.log(Level.FINEST, "Clock: exit normally: {0}", this.getLastStatus());
 
     } catch (final Throwable ex) {
       LOG.log(Level.SEVERE, "Clock: Error in main event loop", ex);
       this.errorHandler.onNext(ex);
-      throw ex;
     }
   }
+
+  /**
+   * Get the last known status of REEF job. Can return null if job has not started yet.
+   * @return Status of the REEF job launched in this environment.
+   */
+  public ReefServiceProtos.JobStatusProto getLastStatus() {
+    return this.jobStatusHandler.getLastStatus();
+  }
 }
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
index 3f22e84..7be8e07 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
@@ -21,11 +21,8 @@
 import com.google.protobuf.ByteString;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
-import org.apache.reef.runtime.common.utils.RemoteManager;
 import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.util.logging.Level;
@@ -39,22 +36,16 @@
 
   private static final Logger LOG = Logger.getLogger(ClientConnection.class.getName());
 
-  private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler;
+  private final JobStatusHandler jobStatusHandler;
   private final String jobIdentifier;
 
   @Inject
   public ClientConnection(
-      final RemoteManager remoteManager,
-      @Parameter(ClientRemoteIdentifier.class) final String clientRID,
-      @Parameter(JobIdentifier.class) final String jobIdentifier) {
+      @Parameter(JobIdentifier.class) final String jobIdentifier,
+      final JobStatusHandler jobStatusHandler) {
+
     this.jobIdentifier = jobIdentifier;
-    if (clientRID.equals(ClientRemoteIdentifier.NONE)) {
-      LOG.log(Level.FINE, "Instantiated 'ClientConnection' without an actual connection to the client.");
-      this.jobStatusHandler = new LoggingJobStatusHandler();
-    } else {
-      this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class);
-      LOG.log(Level.FINE, "Instantiated 'ClientConnection'");
-    }
+    this.jobStatusHandler = jobStatusHandler;
   }
 
   /**
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java
new file mode 100644
index 0000000..1cc2034
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Generic interface for job status messages' handler.
+ * Receive JobStatusProto messages and keep the last message so it can be retrieved via getLastStatus() method.
+ */
+@DriverSide
+@DefaultImplementation(RemoteClientJobStatusHandler.class)
+public interface JobStatusHandler extends EventHandler<ReefServiceProtos.JobStatusProto> {
+
+  /**
+   * Return the last known status of the REEF job.
+   * Can return null if the job has not been launched yet.
+   * @return Last status of the REEF job.
+   */
+  ReefServiceProtos.JobStatusProto getLastStatus();
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
index f627c2b..0bf7ce5 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
@@ -18,8 +18,8 @@
  */
 package org.apache.reef.runtime.common.driver.client;
 
+import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.util.logging.Level;
@@ -28,15 +28,30 @@
 /**
  * A handler for job status messages that just logs them.
  */
-final class LoggingJobStatusHandler implements EventHandler<ReefServiceProtos.JobStatusProto> {
+@DriverSide
+public class LoggingJobStatusHandler implements JobStatusHandler {
+
   private static final Logger LOG = Logger.getLogger(LoggingJobStatusHandler.class.getName());
 
+  private ReefServiceProtos.JobStatusProto lastStatus = null;
+
   @Inject
   LoggingJobStatusHandler() {
   }
 
   @Override
   public void onNext(final ReefServiceProtos.JobStatusProto jobStatusProto) {
-    LOG.log(Level.INFO, "Received a JobStatus message that can't be sent:\n" + jobStatusProto.toString());
+    this.lastStatus = jobStatusProto;
+    LOG.log(Level.INFO, "In-process JobStatus:\n{0}", jobStatusProto);
+  }
+
+  /**
+   * Return the last known status of the REEF job.
+   * Can return null if the job has not been launched yet.
+   * @return Last status of the REEF job.
+   */
+  @Override
+  public ReefServiceProtos.JobStatusProto getLastStatus() {
+    return this.lastStatus;
   }
 }
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java
new file mode 100644
index 0000000..001fca3
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Generic interface for job status messages' handler.
+ */
+@DriverSide
+final class RemoteClientJobStatusHandler implements JobStatusHandler {
+
+  private static final Logger LOG = Logger.getLogger(RemoteClientJobStatusHandler.class.getName());
+
+  private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler;
+
+  private ReefServiceProtos.JobStatusProto lastStatus = null;
+
+  @Inject
+  private RemoteClientJobStatusHandler(
+      final RemoteManager remoteManager,
+      @Parameter(ClientRemoteIdentifier.class) final String clientRID) {
+
+    if (clientRID.equals(ClientRemoteIdentifier.NONE)) {
+      LOG.log(Level.FINE, "Instantiated 'RemoteClientJobStatusHandler' without an actual connection to the client.");
+      this.jobStatusHandler = new LoggingJobStatusHandler();
+    } else {
+      this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class);
+      LOG.log(Level.FINE, "Instantiated 'RemoteClientJobStatusHandler' for {0}", clientRID);
+    }
+  }
+
+  @Override
+  public void onNext(final ReefServiceProtos.JobStatusProto jobStatus) {
+    this.lastStatus = jobStatus;
+    this.jobStatusHandler.onNext(jobStatus);
+  }
+
+  /**
+   * Return the last known status of the REEF job.
+   * Can return null if the job has not been launched yet.
+   * @return Last status of the REEF job.
+   */
+  @Override
+  public ReefServiceProtos.JobStatusProto getLastStatus() {
+    return this.lastStatus;
+  }
+}
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
index 658b107..2b7ba09 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.reef.runtime.local.driver;
 
 import org.apache.reef.runtime.common.driver.api.*;
+import org.apache.reef.runtime.common.driver.client.JobStatusHandler;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes;
 import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
@@ -30,10 +31,7 @@
 import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
 import org.apache.reef.runtime.local.client.parameters.RackNames;
 import org.apache.reef.runtime.local.client.parameters.RootFolder;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.OptionalParameter;
-import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.tang.formats.*;
 
 /**
  * ConfigurationModule for the Driver executed in the local resourcemanager. This is meant to eventually replace
@@ -68,6 +66,11 @@
   public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
 
   /**
+   * Interface to use for communications back to the client.
+   */
+  public static final OptionalImpl<JobStatusHandler> JOB_STATUS_HANDLER = new OptionalImpl<>();
+
+  /**
    * The identifier of the Job submitted.
    */
   public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
@@ -78,6 +81,7 @@
       .bindImplementation(ResourceReleaseHandler.class, LocalResourceReleaseHandler.class)
       .bindImplementation(ResourceManagerStartHandler.class, LocalResourceManagerStartHandler.class)
       .bindImplementation(ResourceManagerStopHandler.class, LocalResourceManagerStopHandler.class)
+      .bindImplementation(JobStatusHandler.class, JOB_STATUS_HANDLER)
       .bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
       .bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER)
       .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER)
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
index 9d64500..3dd7180 100644
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java
@@ -22,6 +22,7 @@
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.client.DriverConfiguration;
 import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.REEFEnvironment;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Tang;
@@ -81,18 +82,18 @@
    * @param failMsgClass A class that should fail during the test.
    * @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN.
    * @param timeOut REEF application timeout - not used yet.
-   * @return launcher status - usually FAIL.
+   * @return Final job status. Final status for tests is usually something
+   * with state = FAILED and exception like SimulatedDriverFailure.
    * @throws InjectionException configuration error.
    */
-  public static LauncherStatus runInProcess(final Class<?> failMsgClass,
+  public static ReefServiceProtos.JobStatusProto runInProcess(final Class<?> failMsgClass,
       final Configuration runtimeConfig, final int timeOut) throws InjectionException {
 
     try (final REEFEnvironment reef =
              REEFEnvironment.fromConfiguration(runtimeConfig, buildDriverConfig(failMsgClass))) {
       reef.run();
+      return reef.getLastStatus();
     }
-
-    return LauncherStatus.FORCE_CLOSED; // TODO[REEF-1596]: Use the actual status, when implemented.
   }
 
   /**
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
index 088aee3..9082a36 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java
@@ -19,6 +19,7 @@
 package org.apache.reef.tests.driver;
 
 import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.REEFEnvironment;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
@@ -30,13 +31,13 @@
 import org.junit.Test;
 
 /**
- * This tests whether the noop driver gets shutdown properly.
+ * This tests whether the noop driver launched in-process gets shutdown properly.
  */
 public final class REEFEnvironmentDriverTest {
 
   private static final Configuration DRIVER_CONFIG = DriverConfiguration.CONF
       .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(DriverTestStartHandler.class))
-      .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_DriverTest")
+      .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_REEFEnvironmentDriverTest")
       .set(DriverConfiguration.ON_DRIVER_STARTED, DriverTestStartHandler.class)
       .build();
 
@@ -45,14 +46,22 @@
       .set(LocalDriverConfiguration.ROOT_FOLDER, ".")
       .set(LocalDriverConfiguration.JVM_HEAP_SLACK, 0.0)
       .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE)
-      .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_DRIVER_TEST")
+      .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_ENV_DRIVER_TEST")
       .set(LocalDriverConfiguration.RUNTIME_NAMES, org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME)
       .build();
 
   @Test
   public void testREEFEnvironmentDriver() throws BindException, InjectionException {
+
     try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(LOCAL_DRIVER_MODULE, DRIVER_CONFIG)) {
+
       reef.run();
+      final ReefServiceProtos.JobStatusProto status = reef.getLastStatus();
+
+      Assert.assertNotNull("REEF job must report its status", status);
+      Assert.assertTrue("REEF job status must contain a state", status.hasState());
+      Assert.assertEquals("Unexpected final job status", ReefServiceProtos.State.DONE, status.getState());
+
     } catch (final Throwable ex) {
       Assert.fail("Local driver execution failed: " + ex);
     }