DRILL-2547: Don't allow Drill to shut down while queries are still executing
This will cause Drillbit.close() to block until all currently executing
fragments have completed.
WorkManager
- added waitForExit() and indicateIfSafeToExit(), which use a latch to
wait to shut down if there are active fragments
- waitForExit() times out after 5 seconds
Drillbit
- call WorkManager.waitForExit() in close()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 0f531b8..958f2dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -252,6 +252,9 @@
return;
}
+ // wait for anything that is running to complete
+ manager.waitToExit();
+
if (coord != null && registrationHandle != null) {
coord.unregister(registrationHandle);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 231e49a..e2bcec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -168,6 +169,46 @@
return dContext;
}
+ private CountDownLatch exitLatch = null; // used to wait to exit when things are still running
+
+ /**
+ * Waits until it is safe to exit. Blocks until all currently running fragments have completed.
+ *
+ * <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p>
+ */
+ public void waitToExit() {
+ synchronized(this) {
+ if (queries.isEmpty() && runningFragments.isEmpty()) {
+ return;
+ }
+
+ exitLatch = new CountDownLatch(1);
+ }
+
+ while(true) {
+ try {
+ exitLatch.await(5, TimeUnit.SECONDS);
+ } catch(InterruptedException e) {
+ // keep waiting
+ }
+ break;
+ }
+ }
+
+ /**
+ * If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will
+ * unblock.
+ */
+ private void indicateIfSafeToExit() {
+ synchronized(this) {
+ if (exitLatch != null) {
+ if (queries.isEmpty() && runningFragments.isEmpty()) {
+ exitLatch.countDown();
+ }
+ }
+ }
+ }
+
/**
* Narrowed interface to WorkManager that is made available to tasks it is managing.
*/
@@ -196,8 +237,11 @@
final QueryId queryId = foreman.getQueryId();
final boolean wasRemoved = queries.remove(queryId, foreman);
if (!wasRemoved) {
- throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
+ logger.warn("Couldn't find retiring Foreman for query " + queryId);
+// throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
}
+
+ indicateIfSafeToExit();
}
public Foreman getForemanForQueryId(final QueryId queryId) {
@@ -219,6 +263,7 @@
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
+ indicateIfSafeToExit();
}
});
}