[REEF-1780] Improve logging when closing message dispatcher on the evaluator manager shutdown
JIRA: [REEF-1780](https://issues.apache.org/jira/browse/REEF-1780)
Closes #1270
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index ce879da..73854b2 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -290,5 +290,10 @@
LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier);
// This effectively closes all dispatchers as they share the same stage.
this.serviceDispatcher.close();
+ if (!this.serviceDispatcher.isThreadPoolClosed()) {
+ LOG.log(Level.SEVERE,
+ "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close",
+ this.evaluatorIdentifier);
+ }
}
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index 3a65df9..f5b1c1d 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -126,6 +126,13 @@
}
/**
+ * Returns true if the internal thread pool is closed.
+ */
+ public boolean isThreadPoolClosed() {
+ return this.stage.isClosed();
+ }
+
+ /**
* Delayed EventHandler.onNext() call.
* Contains a message object and EventHandler to process it.
*/
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 7b6107f..005db44 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -27,10 +27,7 @@
import javax.inject.Inject;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -230,11 +227,22 @@
new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()});
}
+ if (!executor.isTerminated()) {
+ LOG.log(Level.SEVERE, "Closing ThreadPoolStage {0}: Executor failed to terminate.", this.name);
+ }
+
LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name);
}
}
/**
+ * Returns true if resources are closed.
+ */
+ public boolean isClosed() {
+ return closed.get() && executor.isTerminated();
+ }
+
+ /**
* Gets the queue length of this stage.
*
* @return the queue length