[REEF-1780] Improve logging when closing message dispatcher on the evaluator manager shutdown

JIRA: [REEF-1780](https://issues.apache.org/jira/browse/REEF-1780)

Closes #1270
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index ce879da..73854b2 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -290,5 +290,10 @@
     LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier);
     // This effectively closes all dispatchers as they share the same stage.
     this.serviceDispatcher.close();
+    if (!this.serviceDispatcher.isThreadPoolClosed()) {
+      LOG.log(Level.SEVERE,
+              "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close",
+              this.evaluatorIdentifier);
+    }
   }
 }
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index 3a65df9..f5b1c1d 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -126,6 +126,13 @@
   }
 
   /**
+   * Returns true if the internal thread pool is closed.
+   */
+  public boolean isThreadPoolClosed() {
+    return this.stage.isClosed();
+  }
+
+  /**
    * Delayed EventHandler.onNext() call.
    * Contains a message object and EventHandler to process it.
    */
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 7b6107f..005db44 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -27,10 +27,7 @@
 
 import javax.inject.Inject;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -230,11 +227,22 @@
             new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()});
       }
 
+      if (!executor.isTerminated()) {
+        LOG.log(Level.SEVERE, "Closing ThreadPoolStage {0}: Executor failed to terminate.", this.name);
+      }
+
       LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name);
     }
   }
 
   /**
+   * Returns true if resources are closed.
+   */
+  public boolean isClosed() {
+    return closed.get() && executor.isTerminated();
+  }
+
+  /**
    * Gets the queue length of this stage.
    *
    * @return the queue length