[REEF-1726] Close message dispatcher on the evaluator manager shutdown
JIRA:
[REEF-1726](https://issues.apache.org/jira/browse/REEF-1726)
Pull Request:
This closes #1241
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 61564b1..c555adc 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -263,15 +263,18 @@
try {
// We need to wait awhile before returning the container to the RM
// in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit.
- this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
+ this.clock.scheduleAlarm(200, new EventHandler<Alarm>() {
@Override
public void onNext(final Alarm alarm) {
+ LOG.log(Level.FINER, "Close EvaluatorManager {0} - release to RM", evaluatorId);
resourceReleaseHandler.onNext(releaseEvent);
+ shutdown();
}
});
} catch (final IllegalStateException e) {
LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
this.resourceReleaseHandler.onNext(releaseEvent);
+ this.shutdown();
}
}
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index 73854b2..e113e76 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -290,10 +290,10 @@
LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier);
// This effectively closes all dispatchers as they share the same stage.
this.serviceDispatcher.close();
- if (!this.serviceDispatcher.isThreadPoolClosed()) {
+ if (!this.serviceDispatcher.isClosed()) {
LOG.log(Level.SEVERE,
- "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close",
- this.evaluatorIdentifier);
+ "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close",
+ this.evaluatorIdentifier);
}
}
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index f5b1c1d..91ff7d6 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -28,6 +28,8 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* Delayed event router that dispatches messages to the proper event handler by type.
@@ -37,6 +39,8 @@
@DriverSide
public final class DispatchingEStage implements AutoCloseable {
+ private static final Logger LOG = Logger.getLogger(DispatchingEStage.class.getName());
+
/**
* A map of event handlers, populated in the register() method.
*/
@@ -98,7 +102,7 @@
/**
* Dispatch a new message by type.
- *
+ * If the stage is already closed, log a warning and ignore the message.
* @param type Type of event handler - must match the register() call.
* @param message A message to process. Must be a subclass of T.
* @param <T> Message type that event handler supports.
@@ -106,8 +110,13 @@
*/
@SuppressWarnings("unchecked")
public <T, U extends T> void onNext(final Class<T> type, final U message) {
- final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type);
- this.stage.onNext(new DelayedOnNext(handler, message));
+ if (this.isClosed()) {
+ LOG.log(Level.WARNING, "Dispatcher {0} already closed: ignoring message {1}: {2}",
+ new Object[] {this.stage, type.getCanonicalName(), message});
+ } else {
+ final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type);
+ this.stage.onNext(new DelayedOnNext(handler, message));
+ }
}
/**
@@ -118,7 +127,8 @@
}
/**
- * Close the internal thread pool.
+ * Close the stage adn stop accepting new messages.
+ * Closes the internal thread pool.
*/
@Override
public void close() {
@@ -126,9 +136,10 @@
}
/**
- * Returns true if the internal thread pool is closed.
+ * Check if the stage can still accept messages.
+ * @return true if the stage can no longer accept messages, false otherwise.
*/
- public boolean isThreadPoolClosed() {
+ public boolean isClosed() {
return this.stage.isClosed();
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
index 2861204..5395054 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
@@ -88,4 +88,20 @@
outMeter.mark(1);
}
+ /**
+ * Check if the stage can still accept messages.
+ * @return true if the stage is closed, false otherwise.
+ */
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ /**
+ * Get human readable representation of the class (used for logging).
+ * @return A string that contains stage name.
+ */
+ @Override
+ public String toString() {
+ return String.format("Stage:%s:%s", this.getClass().getCanonicalName(), name);
+ }
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 005db44..4c57fa7 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -236,13 +236,6 @@
}
/**
- * Returns true if resources are closed.
- */
- public boolean isClosed() {
- return closed.get() && executor.isTerminated();
- }
-
- /**
* Gets the queue length of this stage.
*
* @return the queue length