DRILL-2547: Don't allow Drill to shut down while queries are still executing
This will cause Drillbit.close() to block until all currently executing
fragments have completed.

WorkManager
- added waitForExit() and indicateIfSafeToExit(), which use a latch to
  wait to shut down if there are active fragments
  - waitForExit() times out after 5 seconds

Drillbit
- call WorkManager.waitForExit() in close()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 0f531b8..958f2dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -252,6 +252,9 @@
       return;
     }
 
+    // wait for anything that is running to complete
+    manager.waitToExit();
+
     if (coord != null && registrationHandle != null) {
       coord.unregister(registrationHandle);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 231e49a..e2bcec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -168,6 +169,46 @@
     return dContext;
   }
 
+  private CountDownLatch exitLatch = null; // used to wait to exit when things are still running
+
+  /**
+   * Waits until it is safe to exit. Blocks until all currently running fragments have completed.
+   *
+   * <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p>
+   */
+  public void waitToExit() {
+    synchronized(this) {
+      if (queries.isEmpty() && runningFragments.isEmpty()) {
+        return;
+      }
+
+      exitLatch = new CountDownLatch(1);
+    }
+
+    while(true) {
+      try {
+        exitLatch.await(5, TimeUnit.SECONDS);
+      } catch(InterruptedException e) {
+        // keep waiting
+      }
+      break;
+    }
+  }
+
+  /**
+   * If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will
+   * unblock.
+   */
+  private void indicateIfSafeToExit() {
+    synchronized(this) {
+      if (exitLatch != null) {
+        if (queries.isEmpty() && runningFragments.isEmpty()) {
+          exitLatch.countDown();
+        }
+      }
+    }
+  }
+
   /**
    * Narrowed interface to WorkManager that is made available to tasks it is managing.
    */
@@ -196,8 +237,11 @@
       final QueryId queryId = foreman.getQueryId();
       final boolean wasRemoved = queries.remove(queryId, foreman);
       if (!wasRemoved) {
-        throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
+        logger.warn("Couldn't find retiring Foreman for query " + queryId);
+//        throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
       }
+
+      indicateIfSafeToExit();
     }
 
     public Foreman getForemanForQueryId(final QueryId queryId) {
@@ -219,6 +263,7 @@
         @Override
         protected void cleanup() {
           runningFragments.remove(fragmentHandle);
+          indicateIfSafeToExit();
         }
       });
     }