IMPALA-8995: Fix synchronization in dequeue thread

The admission controller's dequeue thread currently wakes up either
when queries release their admission resources or when a statestore
update is received. The dequeue loop releases the admission lock at
the end of the loop, then acquires it back and calls wait on it.
In this small window, a query can complete, update the admission
stats by acquiring the admission lock, and then call send a notify
to wake the dequeue thread. But since the dequeue thread has not
called wait yet, it can miss this notify. Moreover if the statestore
is down there is no way of waking it up. This will cause the queued
queries to eventually timeout. This patch attempts to fix this by
removing that window.

Testing:
Was able to trigger this manually by adding a sleep right before
the dequeue loop acquires the admission lock.

Change-Id: I91080ce54e59cc7e6361f7c50d6b2156a8a180c8
Reviewed-on: http://gerrit.cloudera.org:8080/14539
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 2f335bc..798abb4 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -324,6 +324,7 @@
     // Lock to ensure the dequeue thread will see the update to done_
     lock_guard<mutex> l(admission_ctrl_lock_);
     done_ = true;
+    pending_dequeue_ = true;
     dequeue_cv_.NotifyOne();
   }
   dequeue_thread_->Join();
@@ -992,6 +993,7 @@
     UpdateExecGroupMetric(schedule.executor_group(), -1);
     VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
              << stats->DebugString();
+    pending_dequeue_ = true;
   }
   dequeue_cv_.NotifyOne();
 }
@@ -1021,6 +1023,7 @@
          << GetPoolStats(schedule)->DebugString();
       VLOG(2) << ss.str();
     }
+    pending_dequeue_ = true;
   }
   dequeue_cv_.NotifyOne();
 }
@@ -1055,6 +1058,7 @@
     }
     UpdateClusterAggregates();
     last_topic_update_time_ms_ = MonotonicMillis();
+    pending_dequeue_ = true;
   }
   dequeue_cv_.NotifyOne(); // Dequeue and admit queries on the dequeue thread
 }
@@ -1369,10 +1373,13 @@
 }
 
 void AdmissionController::DequeueLoop() {
+  unique_lock<mutex> lock(admission_ctrl_lock_);
   while (true) {
-    unique_lock<mutex> lock(admission_ctrl_lock_);
     if (done_) break;
-    dequeue_cv_.Wait(lock);
+    while (!pending_dequeue_) {
+      dequeue_cv_.Wait(lock);
+    }
+    pending_dequeue_ = false;
     ClusterMembershipMgr::SnapshotPtr membership_snapshot =
         cluster_membership_mgr_->GetSnapshot();
 
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index e106751..3b93d3f 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -732,6 +732,10 @@
   typedef boost::unordered_map<std::string, TPoolConfig> PoolConfigMap;
   PoolConfigMap pool_config_map_;
 
+  /// Indicates whether a change in pool stats warrants an attempt by the dequeuing
+  /// thread to dequeue.
+  bool pending_dequeue_ = true;
+
   /// Notifies the dequeuing thread that pool stats have changed and it may be
   /// possible to dequeue and admit queries.
   ConditionVariable dequeue_cv_;