[REEF-1726] Close message dispatcher on the evaluator manager shutdown

JIRA:
  [REEF-1726](https://issues.apache.org/jira/browse/REEF-1726)

Pull Request:
  This closes #1241
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 61564b1..c555adc 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -263,15 +263,18 @@
         try {
           // We need to wait awhile before returning the container to the RM
           // in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit.
-          this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
+          this.clock.scheduleAlarm(200, new EventHandler<Alarm>() {
             @Override
             public void onNext(final Alarm alarm) {
+              LOG.log(Level.FINER, "Close EvaluatorManager {0} - release to RM", evaluatorId);
               resourceReleaseHandler.onNext(releaseEvent);
+              shutdown();
             }
           });
         } catch (final IllegalStateException e) {
           LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
           this.resourceReleaseHandler.onNext(releaseEvent);
+          this.shutdown();
         }
       }
     }
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index 73854b2..e113e76 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -290,10 +290,10 @@
     LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier);
     // This effectively closes all dispatchers as they share the same stage.
     this.serviceDispatcher.close();
-    if (!this.serviceDispatcher.isThreadPoolClosed()) {
+    if (!this.serviceDispatcher.isClosed()) {
       LOG.log(Level.SEVERE,
-              "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close",
-              this.evaluatorIdentifier);
+          "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close",
+          this.evaluatorIdentifier);
     }
   }
 }
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index f5b1c1d..91ff7d6 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -28,6 +28,8 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Delayed event router that dispatches messages to the proper event handler by type.
@@ -37,6 +39,8 @@
 @DriverSide
 public final class DispatchingEStage implements AutoCloseable {
 
+  private static final Logger LOG = Logger.getLogger(DispatchingEStage.class.getName());
+
   /**
    * A map of event handlers, populated in the register() method.
    */
@@ -98,7 +102,7 @@
 
   /**
    * Dispatch a new message by type.
-   *
+   * If the stage is already closed, log a warning and ignore the message.
    * @param type    Type of event handler - must match the register() call.
    * @param message A message to process. Must be a subclass of T.
    * @param <T>     Message type that event handler supports.
@@ -106,8 +110,13 @@
    */
   @SuppressWarnings("unchecked")
   public <T, U extends T> void onNext(final Class<T> type, final U message) {
-    final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type);
-    this.stage.onNext(new DelayedOnNext(handler, message));
+    if (this.isClosed()) {
+      LOG.log(Level.WARNING, "Dispatcher {0} already closed: ignoring message {1}: {2}",
+          new Object[] {this.stage, type.getCanonicalName(), message});
+    } else {
+      final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type);
+      this.stage.onNext(new DelayedOnNext(handler, message));
+    }
   }
 
   /**
@@ -118,7 +127,8 @@
   }
 
   /**
-   * Close the internal thread pool.
+   * Close the stage adn stop accepting new messages.
+   * Closes the internal thread pool.
    */
   @Override
   public void close() {
@@ -126,9 +136,10 @@
   }
 
   /**
-   * Returns true if the internal thread pool is closed.
+   * Check if the stage can still accept messages.
+   * @return true if the stage can no longer accept messages, false otherwise.
    */
-  public boolean isThreadPoolClosed() {
+  public boolean isClosed() {
     return this.stage.isClosed();
   }
 
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
index 2861204..5395054 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
@@ -88,4 +88,20 @@
     outMeter.mark(1);
   }
 
+  /**
+   * Check if the stage can still accept messages.
+   * @return true if the stage is closed, false otherwise.
+   */
+  public boolean isClosed() {
+    return closed.get();
+  }
+
+  /**
+   * Get human readable representation of the class (used for logging).
+   * @return A string that contains stage name.
+   */
+  @Override
+  public String toString() {
+    return String.format("Stage:%s:%s", this.getClass().getCanonicalName(), name);
+  }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 005db44..4c57fa7 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -236,13 +236,6 @@
   }
 
   /**
-   * Returns true if resources are closed.
-   */
-  public boolean isClosed() {
-    return closed.get() && executor.isTerminated();
-  }
-
-  /**
    * Gets the queue length of this stage.
    *
    * @return the queue length