[REEF-1731] Implement JobSubmitted client-side event and a corresponding handler

Summary of changes:
    * When a job submitted to the resource manager, issue an event
      `SubmittedJob`
    * Create a default handler of such event, that would just write a new job
      ID to the log
    * Implement a synchronous version in `DriverLauncher.submit()`
    * Minor logging and refactoring in related code

JIRA:
  [REEF-1731](https://issues.apache.org/jira/browse/REEF-1731)

Pull request:
  This closes #1248
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
index 2e8e197..d100422 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/ClientConfiguration.java
@@ -39,6 +39,12 @@
   public static final OptionalImpl<EventHandler<JobMessage>> ON_JOB_MESSAGE = new OptionalImpl<>();
 
   /**
+   * Handler for the event when a REEF job is submitted to the Resource Manager.
+   * Default implementation just writes the new job ID to the log.
+   */
+  public static final OptionalImpl<EventHandler<SubmittedJob>> ON_JOB_SUBMITTED = new OptionalImpl<>();
+
+  /**
    * Handler for the event when a submitted REEF Job is running.
    * Default implementation just writes to the log.
    */
@@ -73,6 +79,7 @@
 
   public static final ConfigurationModule CONF = new ClientConfiguration()
       .bind(JobMessageHandler.class, ON_JOB_MESSAGE)
+      .bind(JobSubmittedHandler.class, ON_JOB_SUBMITTED)
       .bind(JobRunningHandler.class, ON_JOB_RUNNING)
       .bind(JobCompletedHandler.class, ON_JOB_COMPLETED)
       .bind(JobFailedHandler.class, ON_JOB_FAILED)
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
index 505eeaa..78ca13a 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
@@ -29,6 +29,8 @@
 import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -36,7 +38,7 @@
  * A launcher for REEF Drivers.
  * <p>
  * It can be instantiated using a configuration that can create a REEF instance.
- * For example, the local resourcemanager and the YARN resourcemanager can do this.
+ * For example, the local resource manager and the YARN resource manager can do this.
  * <p>
  * See {@link org.apache.reef.examples.hello} package for a demo use case.
  */
@@ -44,11 +46,12 @@
 @Provided
 @ClientSide
 @Unit
-public final class DriverLauncher {
+public final class DriverLauncher implements AutoCloseable {
 
   private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());
 
   private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF
+      .set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class)
       .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
       .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
       .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
@@ -58,7 +61,9 @@
   private final REEF reef;
 
   private LauncherStatus status = LauncherStatus.INIT;
-  private RunningJob theJob = null;
+
+  private String jobId;
+  private RunningJob theJob;
 
   @Inject
   private DriverLauncher(final REEF reef) {
@@ -81,14 +86,21 @@
   /**
    * Kills the running job.
    */
-  public synchronized void close() {
-    if (this.status.isRunning()) {
-      this.status = LauncherStatus.FORCE_CLOSED;
+  @Override
+  public void close() {
+    synchronized (this) {
+      LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status});
+      if (this.status.isRunning()) {
+        this.status = LauncherStatus.FORCE_CLOSED;
+      }
+      if (null != this.theJob) {
+        this.theJob.close();
+      }
+      this.notify();
     }
-    if (null != this.theJob) {
-      this.theJob.close();
-    }
-    this.notify();
+    LOG.log(Level.FINEST, "Close launcher: shutdown REEF");
+    this.reef.close();
+    LOG.log(Level.FINEST, "Close launcher: done");
   }
 
   /**
@@ -110,68 +122,114 @@
       }
     }
     this.reef.close();
+    return this.getStatus();
+  }
+
+  /**
+   * Submit REEF job asynchronously and do not wait for its completion.
+   *
+   * @param driverConfig configuration of hte driver to submit to the RM.
+   * @return ID of the new application.
+   */
+  public String submit(final Configuration driverConfig, final long waitTime) {
+    this.reef.submit(driverConfig);
+    this.waitForStatus(waitTime, LauncherStatus.SUBMITTED);
+    return this.jobId;
+  }
+
+  /**
+   * Wait for one of the specified statuses of the REEF job.
+   * This method is called after the job is submitted to the RM via submit().
+   * @param waitTime wait time in milliseconds.
+   * @param statuses array of statuses to wait for.
+   * @return the state of the job after the wait.
+   */
+  public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) {
+
+    final long endTime = System.currentTimeMillis() + waitTime;
+
+    final HashSet<LauncherStatus> statSet = new HashSet<>(statuses.length * 2);
+    Collections.addAll(statSet, statuses);
+    Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED);
+
+    LOG.log(Level.FINEST, "Wait for status: {0}", statSet);
+    final LauncherStatus finalStatus;
+
     synchronized (this) {
-      return this.status;
+      while (!statSet.contains(this.status)) {
+        try {
+          final long delay = endTime - System.currentTimeMillis();
+          if (delay <= 0) {
+            break;
+          }
+          LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay);
+          this.wait(delay);
+        } catch (final InterruptedException ex) {
+          LOG.log(Level.FINE, "Interrupted: {0}", ex);
+        }
+      }
+
+      finalStatus = this.status;
     }
+
+    LOG.log(Level.FINEST, "Final status: {0}", finalStatus);
+    return finalStatus;
   }
 
   /**
    * Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
    *
    * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
-   * @param timeOut      timeout on the job.
+   * @param timeOut timeout on the job.
    * @return the state of the job after execution.
    */
   public LauncherStatus run(final Configuration driverConfig, final long timeOut) {
-    final long endTime = System.currentTimeMillis() + timeOut;
+
+    final long startTime = System.currentTimeMillis();
+
     this.reef.submit(driverConfig);
-    synchronized (this) {
-      while (!this.status.isDone()) {
-        try {
-          final long waitTime = endTime - System.currentTimeMillis();
-          if (waitTime <= 0) {
-            break;
-          }
-          LOG.log(Level.FINE, "Wait for {0} milliSeconds", waitTime);
-          this.wait(waitTime);
-        } catch (final InterruptedException ex) {
-          LOG.log(Level.FINE, "Interrupted: {0}", ex);
-        }
-      }
-      if (System.currentTimeMillis() >= endTime) {
-        LOG.log(Level.WARNING, "The Job timed out.");
+    this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED);
+
+    if (System.currentTimeMillis() - startTime >= timeOut) {
+      LOG.log(Level.WARNING, "The Job timed out.");
+      synchronized (this) {
         this.status = LauncherStatus.FORCE_CLOSED;
       }
     }
 
     this.reef.close();
-    synchronized (this) {
-      return this.status;
-    }
+    return this.getStatus();
   }
 
   /**
    * @return the current status of the job.
    */
-  public LauncherStatus getStatus() {
-    synchronized (this) {
-      return this.status;
-    }
+  public synchronized LauncherStatus getStatus() {
+    return this.status;
   }
 
-  /**
-   * Update job status and notify the waiting thread.
-   */
-  @SuppressWarnings("checkstyle:hiddenfield")
-  public synchronized void setStatusAndNotify(final LauncherStatus status) {
-    LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[]{this.status, status});
-    this.status = status;
+  /** Update job status and notify the waiting thread. */
+  public synchronized void setStatusAndNotify(final LauncherStatus newStatus) {
+    LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus});
+    this.status = newStatus;
     this.notify();
   }
 
   @Override
   public String toString() {
-    return this.status.toString();
+    return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status);
+  }
+
+  /**
+   * Job driver notifies us that the job has been submitted to the Resource Manager.
+   */
+  public final class SubmittedJobHandler implements EventHandler<SubmittedJob> {
+    @Override
+    public void onNext(final SubmittedJob job) {
+      LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId());
+      jobId = job.getId();
+      setStatusAndNotify(LauncherStatus.SUBMITTED);
+    }
   }
 
   /**
@@ -217,7 +275,7 @@
   public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
     @Override
     public void onNext(final FailedRuntime error) {
-      LOG.log(Level.SEVERE, "Received a resourcemanager error", error.getReason());
+      LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason());
       theJob = null;
       setStatusAndNotify(LauncherStatus.failed(error.getReason()));
     }
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
index 434d22b..9d6965d 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/LauncherStatus.java
@@ -26,10 +26,12 @@
 public final class LauncherStatus {
 
   public static final LauncherStatus INIT = new LauncherStatus(State.INIT);
+  public static final LauncherStatus SUBMITTED = new LauncherStatus(State.SUBMITTED);
   public static final LauncherStatus RUNNING = new LauncherStatus(State.RUNNING);
   public static final LauncherStatus COMPLETED = new LauncherStatus(State.COMPLETED);
   public static final LauncherStatus FORCE_CLOSED = new LauncherStatus(State.FORCE_CLOSED);
   public static final LauncherStatus FAILED = new LauncherStatus(State.FAILED);
+
   private final State state;
   private final Optional<Throwable> error;
 
@@ -37,7 +39,6 @@
     this(state, null);
   }
 
-
   private LauncherStatus(final State state, final Throwable ex) {
     this.state = state;
     this.error = Optional.ofNullable(ex);
@@ -120,6 +121,7 @@
    */
   private enum State {
     INIT,
+    SUBMITTED,
     RUNNING,
     COMPLETED,
     FAILED,
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/SubmittedJob.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/SubmittedJob.java
new file mode 100644
index 0000000..3b88173
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/SubmittedJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.naming.Identifiable;
+
+/**
+ * Represents a completed REEF job.
+ */
+@Public
+@ClientSide
+@Provided
+public interface SubmittedJob extends Identifiable {
+
+  /** @return ID of the submitted job. */
+  @Override
+  String getId();
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobSubmittedHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobSubmittedHandler.java
new file mode 100644
index 0000000..20891ff
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/parameters/JobSubmittedHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.client.parameters;
+
+import org.apache.reef.client.SubmittedJob;
+import org.apache.reef.runtime.common.client.defaults.DefaultSubmittedJobHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/** Handler for the SubmittedJob event. */
+@NamedParameter(doc = "Event handler for SubmittedJob", default_classes = DefaultSubmittedJobHandler.class)
+public final class JobSubmittedHandler implements Name<EventHandler<SubmittedJob>> {
+
+  private JobSubmittedHandler() { }
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
index ac52922..1b56aec 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
@@ -22,20 +22,20 @@
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.client.REEF;
+import org.apache.reef.client.SubmittedJob;
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.client.parameters.JobSubmittedHandler;
 import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.ConfigurationBuilder;
-import org.apache.reef.tang.ConfigurationProvider;
-import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.*;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.REEFVersion;
 import org.apache.reef.util.logging.LoggingScope;
 import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.util.Set;
@@ -53,38 +53,43 @@
   private static final Logger LOG = Logger.getLogger(REEFImplementation.class.getName());
 
   private final JobSubmissionHandler jobSubmissionHandler;
-  private final RunningJobs runningJobs;
   private final JobSubmissionHelper jobSubmissionHelper;
+  private final InjectionFuture<EventHandler<SubmittedJob>> jobSubmittedHandler;
+  private final RunningJobs runningJobs;
   private final ClientWireUp clientWireUp;
   private final LoggingScopeFactory loggingScopeFactory;
   private final Set<ConfigurationProvider> configurationProviders;
 
   /**
    * @param jobSubmissionHandler
-   * @param runningJobs
    * @param jobSubmissionHelper
    * @param jobStatusMessageHandler is passed only to make sure it is instantiated
+   * @param runningJobs
    * @param clientWireUp
-   * @param reefVersion             provides the current version of REEF.
+   * @param reefVersion provides the current version of REEF.
    * @param configurationProviders
    */
   @Inject
-  REEFImplementation(final JobSubmissionHandler jobSubmissionHandler,
-                     final RunningJobs runningJobs,
-                     final JobSubmissionHelper jobSubmissionHelper,
-                     final JobStatusMessageHandler jobStatusMessageHandler,
-                     final ClientWireUp clientWireUp,
-                     final LoggingScopeFactory loggingScopeFactory,
-                     final REEFVersion reefVersion,
-                     @Parameter(DriverConfigurationProviders.class)
-                     final Set<ConfigurationProvider> configurationProviders) {
+  private REEFImplementation(
+        final JobSubmissionHandler jobSubmissionHandler,
+        final JobSubmissionHelper jobSubmissionHelper,
+        final JobStatusMessageHandler jobStatusMessageHandler,
+        final RunningJobs runningJobs,
+        final ClientWireUp clientWireUp,
+        final LoggingScopeFactory loggingScopeFactory,
+        final REEFVersion reefVersion,
+        @Parameter(JobSubmittedHandler.class) final InjectionFuture<EventHandler<SubmittedJob>> jobSubmittedHandler,
+        @Parameter(DriverConfigurationProviders.class) final Set<ConfigurationProvider> configurationProviders) {
+
     this.jobSubmissionHandler = jobSubmissionHandler;
-    this.runningJobs = runningJobs;
+    this.jobSubmittedHandler = jobSubmittedHandler;
     this.jobSubmissionHelper = jobSubmissionHelper;
+    this.runningJobs = runningJobs;
     this.clientWireUp = clientWireUp;
     this.configurationProviders = configurationProviders;
-    clientWireUp.performWireUp();
     this.loggingScopeFactory = loggingScopeFactory;
+
+    clientWireUp.performWireUp();
     reefVersion.logVersion();
   }
 
@@ -127,6 +132,9 @@
       }
 
       this.jobSubmissionHandler.onNext(submissionMessage);
+
+      this.jobSubmittedHandler.get().onNext(
+          new SubmittedJobImpl(this.jobSubmissionHandler.getApplicationId()));
     }
   }
 
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/SubmittedJobImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/SubmittedJobImpl.java
new file mode 100644
index 0000000..c248262
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/SubmittedJobImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.SubmittedJob;
+
+/** An implementation of the SubmittedJob interface. */
+@ClientSide
+@Private
+final class SubmittedJobImpl implements SubmittedJob {
+
+  private final String jobId;
+
+  SubmittedJobImpl(final String jobId) {
+    this.jobId = jobId;
+  }
+
+  @Override
+  public String getId() {
+    return this.jobId;
+  }
+
+  @Override
+  public String toString() {
+    return "SubmittedJob: { id:" + this.jobId + " }";
+  }
+}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultSubmittedJobHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultSubmittedJobHandler.java
new file mode 100644
index 0000000..c90a82b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultSubmittedJobHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.SubmittedJob;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/** Default event handler for SubmittedJob: Log the new job ID. */
+@Provided
+@ClientSide
+public final class DefaultSubmittedJobHandler implements EventHandler<SubmittedJob> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultSubmittedJobHandler.class.getName());
+
+  @Inject
+  private DefaultSubmittedJobHandler() { }
+
+  @Override
+  public void onNext(final SubmittedJob job) {
+    LOG.log(Level.INFO, "Job Submitted: {0}", job);
+  }
+}