IMPALA-8762: Track host level admission stats across all coordinators

This patch adds the ability to share the per-host stats for locally
admitted queries across all coordinators. This helps to get a more
consolidated view of the cluster for stats like slots_in_use and
mem_admitted when making local admission decisions.

Testing:
Added e2e py test

Change-Id: I2946832e0a89b077d0f3bec755e4672be2088243
Reviewed-on: http://gerrit.cloudera.org:8080/17683
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 1d1f544..da79c07 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -71,12 +71,17 @@
   return PrettyPrinter::Print(value, TUnit::BYTES);
 }
 
-// Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>".
-// "!" is used because the backend id contains a colon, but it should not contain "!".
-// When parsing the topic key we need to be careful to find the last instance in
-// case the pool name contains it as well.
+// Delimiter used for pool topic keys of the form
+// "<prefix><pool_name><delimiter><backend_id>". "!" is used because the backend id
+// contains a colon, but it should not contain "!". When parsing the topic key we need to
+// be careful to find the last instance in case the pool name contains it as well.
 const char TOPIC_KEY_DELIMITER = '!';
 
+// Prefix used by topic keys for pool stat updates.
+const string TOPIC_KEY_POOL_PREFIX = "POOL:";
+// Prefix used by topic keys for PerHostStat updates.
+const string TOPIC_KEY_STAT_PREFIX = "STAT:";
+
 // Delimiter used for the resource pool prefix of executor groups. In order to be used for
 // queries in "resource-pool-A", an executor group name must start with
 // "resource-pool-A-".
@@ -236,21 +241,30 @@
                                        "host $0. Needed $1 slots but $2/$3 are already "
                                        "in use.";
 
-// Parses the pool name and backend_id from the topic key if it is valid.
-// Returns true if the topic key is valid and pool_name and backend_id are set.
-static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
-    string* backend_id) {
-  // Topic keys will look something like: poolname!hostname:22000
-  // The '!' delimiter should always be present, the pool name must be
-  // at least 1 character, and network address must be at least 3 characters (including
-  // ':' and if the hostname and port are each only 1 character). Then the topic_key must
-  // be at least 5 characters (1 + 1 + 3).
-  const int MIN_TOPIC_KEY_SIZE = 5;
+// Parses the topic key to separate the prefix that helps recognize the kind of update
+// received.
+static inline bool ParseTopicKey(
+    const string& topic_key, string* prefix, string* suffix) {
+  // The prefix should always be present and the network address must be at least 3
+  // characters (including ':' and if the hostname and port are each only 1 character).
+  // Then the topic_key must be at least 8 characters (5 + 3).
+  const int MIN_TOPIC_KEY_SIZE = 8;
   if (topic_key.length() < MIN_TOPIC_KEY_SIZE) {
     VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
     return false;
   }
+  DCHECK_EQ(TOPIC_KEY_POOL_PREFIX.size(), TOPIC_KEY_STAT_PREFIX.size())
+      << "All admission topic key prefixes should be of the same size";
+  *prefix = topic_key.substr(0, TOPIC_KEY_POOL_PREFIX.size());
+  *suffix = topic_key.substr(TOPIC_KEY_POOL_PREFIX.size());
+  return true;
+}
 
+// Parses the pool name and backend_id from the topic key if it is valid.
+// Returns true if the topic key is valid and pool_name and backend_id are set.
+static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
+    string* backend_id) {
+  // Pool topic keys will look something like: poolname!hostname:22000
   size_t pos = topic_key.find_last_of(TOPIC_KEY_DELIMITER);
   if (pos == string::npos || pos >= topic_key.size() - 1) {
     VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
@@ -851,16 +865,25 @@
     const NetworkAddressPB& host = entry.first;
     const string host_id = NetworkAddressPBToString(host);
     int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit();
-    const HostStats& host_stats = host_stats_[host_id];
+    const THostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
-    int64_t mem_admitted = host_stats.mem_admitted;
+    int64_t agg_mem_admitted_on_host = host_stats.mem_admitted;
+    // Aggregate the mem admitted across all queries admitted by other coordinators.
+    for (const auto& remote_entry : remote_per_host_stats_) {
+      auto remote_stat_itr = remote_entry.second.find(host_id);
+      if (remote_stat_itr != remote_entry.second.end()) {
+        agg_mem_admitted_on_host += remote_stat_itr->second.mem_admitted;
+      }
+    }
     int64_t mem_to_admit = GetMemToAdmit(state, entry.second);
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
-             << " mem_admitted=" << PrintBytes(mem_admitted)
+             << " mem_admitted=" << PrintBytes(host_stats.mem_admitted)
+             << " agg_mem_admitted_on_host=" << PrintBytes(agg_mem_admitted_on_host)
              << " needs=" << PrintBytes(mem_to_admit)
              << " admit_mem_limit=" << PrintBytes(admit_mem_limit);
-    int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted);
+    int64_t effective_host_mem_reserved =
+        std::max(mem_reserved, agg_mem_admitted_on_host);
     if (effective_host_mem_reserved + mem_to_admit > admit_mem_limit) {
       *mem_unavailable_reason =
           Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(mem_to_admit),
@@ -894,14 +917,23 @@
     const NetworkAddressPB& host = entry.first;
     const string host_id = NetworkAddressPBToString(host);
     int64_t admission_slots = entry.second.be_desc.admission_slots();
-    int64_t slots_in_use = host_stats_[host_id].slots_in_use;
+    int64_t agg_slots_in_use_on_host = host_stats_[host_id].slots_in_use;
+    // Aggregate num of slots in use across all queries admitted by other coordinators.
+    for (const auto& remote_entry : remote_per_host_stats_) {
+      auto remote_stat_itr = remote_entry.second.find(host_id);
+      if (remote_stat_itr != remote_entry.second.end()) {
+        agg_slots_in_use_on_host += remote_stat_itr->second.slots_in_use;
+      }
+    }
     VLOG_ROW << "Checking available slot on host=" << host_id
-             << " slots_in_use=" << slots_in_use
-             << " needs=" << slots_in_use + entry.second.exec_params->slots_to_use()
+             << " slots_in_use=" << agg_slots_in_use_on_host << " needs="
+             << agg_slots_in_use_on_host + entry.second.exec_params->slots_to_use()
              << " executor admission_slots=" << admission_slots;
-    if (slots_in_use + entry.second.exec_params->slots_to_use() > admission_slots) {
+    if (agg_slots_in_use_on_host + entry.second.exec_params->slots_to_use()
+        > admission_slots) {
       *unavailable_reason = Substitute(HOST_SLOT_NOT_AVAILABLE, host_id,
-          entry.second.exec_params->slots_to_use(), slots_in_use, admission_slots);
+          entry.second.exec_params->slots_to_use(), agg_slots_in_use_on_host,
+          admission_slots);
       if (entry.second.be_desc.is_coordinator()) {
         coordinator_resource_limited = true;
       }
@@ -1544,7 +1576,7 @@
     vector<TTopicDelta>* subscriber_topic_updates) {
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
-    AddPoolUpdates(subscriber_topic_updates);
+    AddPoolAndPerHostStatsUpdates(subscriber_topic_updates);
 
     StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
         incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC);
@@ -1590,26 +1622,54 @@
 }
 
 void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_updates) {
-  for (const TTopicItem& item: topic_updates) {
-    string pool_name;
-    string topic_backend_id;
-    if (!ParsePoolTopicKey(item.key, &pool_name, &topic_backend_id)) continue;
-    // The topic entry from this subscriber is handled specially; the stats coming
-    // from the statestore are likely already outdated.
-    if (topic_backend_id == host_id_) continue;
-    if (item.deleted) {
-      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
-      continue;
+  string topic_key_prefix;
+  string topic_key_suffix;
+  string pool_name;
+  string topic_backend_id;
+  for (const TTopicItem& item : topic_updates) {
+    if (!ParseTopicKey(item.key, &topic_key_prefix, &topic_key_suffix)) continue;
+    if (topic_key_prefix == TOPIC_KEY_POOL_PREFIX) {
+      if (!ParsePoolTopicKey(topic_key_suffix, &pool_name, &topic_backend_id)) continue;
+      // The topic entry from this subscriber is handled specially; the stats coming
+      // from the statestore are likely already outdated.
+      if (topic_backend_id == host_id_) continue;
+      if (item.deleted) {
+        GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+        continue;
+      }
+      TPoolStats remote_update;
+      uint32_t len = item.value.size();
+      Status status =
+          DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(item.value.data()), &len,
+              false, &remote_update);
+      if (!status.ok()) {
+        VLOG_QUERY << "Error deserializing pool update with key: " << item.key;
+        continue;
+      }
+      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, &remote_update);
+    } else if (topic_key_prefix == TOPIC_KEY_STAT_PREFIX) {
+      topic_backend_id = topic_key_suffix;
+      if (topic_backend_id == host_id_) continue;
+      if (item.deleted) {
+        remote_per_host_stats_.erase(topic_backend_id);
+        continue;
+      }
+      TPerHostStatsUpdate remote_update;
+      uint32_t len = item.value.size();
+      Status status =
+          DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(item.value.data()), &len,
+              false, &remote_update);
+      if (!status.ok()) {
+        VLOG_QUERY << "Error deserializing stats update with key: " << item.key;
+        continue;
+      }
+      PerHostStats& stats = remote_per_host_stats_[topic_backend_id];
+      for(const auto& elem: remote_update.per_host_stats) {
+        stats[elem.host_addr] = elem.stats;
+      }
+    } else {
+      VLOG_QUERY << "Invalid topic key prefix: " << topic_key_prefix;
     }
-    TPoolStats remote_update;
-    uint32_t len = item.value.size();
-    Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
-          item.value.data()), &len, false, &remote_update);
-    if (!status.ok()) {
-      VLOG_QUERY << "Error deserializing pool update with key: " << item.key;
-      continue;
-    }
-    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, &remote_update);
   }
 }
 
@@ -1873,13 +1933,17 @@
   metrics_.local_backend_mem_usage->SetValue(current_usage);
 }
 
-void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
+void AdmissionController::AddPoolAndPerHostStatsUpdates(
+    vector<TTopicDelta>* topic_updates) {
   // local_stats_ are updated eagerly except for backend_mem_reserved (which isn't used
   // for local admission control decisions). Update that now before sending local_stats_.
   for (auto& entry : pool_stats_) {
     entry.second.UpdateMemTrackerStats();
   }
-  if (pools_for_updates_.empty()) return;
+  if (pools_for_updates_.empty()) {
+    // No pool updates means no changes to host stats as well, so just return.
+    return;
+  }
   topic_updates->push_back(TTopicDelta());
   TTopicDelta& topic_delta = topic_updates->back();
   topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
@@ -1894,10 +1958,28 @@
         &topic_item.value);
     if (!status.ok()) {
       LOG(WARNING) << "Failed to serialize query pool stats: " << status.GetDetail();
-      topic_updates->pop_back();
+      topic_delta.topic_entries.pop_back();
     }
   }
   pools_for_updates_.clear();
+
+  // Now add the host stats
+  topic_delta.topic_entries.push_back(TTopicItem());
+  TTopicItem& topic_item = topic_delta.topic_entries.back();
+  topic_item.key = Substitute("$0$1", TOPIC_KEY_STAT_PREFIX, host_id_);
+  TPerHostStatsUpdate update;
+  for (const auto& elem : host_stats_) {
+    update.per_host_stats.emplace_back();
+    TPerHostStatsUpdateElement& inserted_elem = update.per_host_stats.back();
+    inserted_elem.__set_host_addr(elem.first);
+    inserted_elem.__set_stats(elem.second);
+  }
+  Status status =
+      thrift_serializer_.SerializeToString(&update, &topic_item.value);
+  if (!status.ok()) {
+    LOG(WARNING) << "Failed to serialize host stats: " << status.GetDetail();
+    topic_delta.topic_entries.pop_back();
+  }
 }
 
 void AdmissionController::DequeueLoop() {
@@ -2362,7 +2444,8 @@
   // Ensure the backend_id does not contain the delimiter to ensure that the topic key
   // can be parsed properly by finding the last instance of the delimiter.
   DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
-  return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
+  return Substitute(
+      "$0$1$2$3", TOPIC_KEY_POOL_PREFIX, pool_name, TOPIC_KEY_DELIMITER, backend_id);
 }
 
 vector<const ExecutorGroup*> AdmissionController::GetExecutorGroupsForQuery(
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index fb8bca2..e52a769 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -437,23 +437,10 @@
   /// Calls ResetInformationalStats on all pools.
   void ResetAllPoolInformationalStats();
 
-  // This struct stores per-host statistics which are used during admission and by HTTP
-  // handlers to query admission control statistics for currently registered backends.
-  struct HostStats {
-    /// The mem reserved for a query that is currently executing is its memory limit, if
-    /// set (which should be the common case with admission control). Otherwise, if the
-    /// query has no limit or the query is finished executing, the current consumption
-    /// (tracked by its query mem tracker) is used.
-    int64_t mem_reserved = 0;
-    /// The per host mem admitted only for the queries admitted locally.
-    int64_t mem_admitted = 0;
-    /// The per host number of queries admitted only for the queries admitted locally.
-    int64_t num_admitted = 0;
-    /// The per host number of slots in use for the queries admitted locally.
-    int64_t slots_in_use = 0;
-  };
-
-  typedef std::unordered_map<std::string, HostStats> PerHostStats;
+  // This maps a backends's id(host/port id) to its host level statistics which are used
+  // during admission and by HTTP handlers to query admission control statistics for
+  // currently registered backends.
+  typedef std::unordered_map<std::string, THostStats> PerHostStats;
 
   // Populates the input map with the per host memory reserved and admitted in the
   // following format: <host_address_str, pair<mem_reserved, mem_admitted>>.
@@ -510,6 +497,11 @@
 
   PerHostStats host_stats_;
 
+  /// A map from other coordinator's host_id (host/port id) -> their view of the
+  /// PerHostStats. Used to get a full view of the cluster state while making admission
+  /// decisions. Updated via statestore updates.
+  std::unordered_map<std::string, PerHostStats> remote_per_host_stats_;
+
   /// Counter of the number of times dequeuing a query failed because of a resource
   /// issue on the coordinator (which therefore cannot be resolved by adding more
   /// executor groups).
@@ -908,9 +900,10 @@
       std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Adds outgoing topic updates to subscriber_topic_updates for pools that have changed
-  /// since the last call to AddPoolUpdates(). Called by UpdatePoolStats() before
-  /// UpdateClusterAggregates(). Must hold admission_ctrl_lock_.
-  void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
+  /// since the last call to AddPoolUpdates(). Also adds the complete local view of
+  /// per-host statistics. Called by UpdatePoolStats() before UpdateClusterAggregates().
+  /// Must hold admission_ctrl_lock_.
+  void AddPoolAndPerHostStatsUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Updates the remote stats with per-host topic_updates coming from the statestore.
   /// Removes remote stats identified by topic deletions coming from the
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 41dedb4..71eb579 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -74,6 +74,34 @@
   8: required i64 num_running;
 }
 
+struct THostStats {
+  // The mem reserved for a query that is currently executing is its memory limit, if
+  // set (which should be the common case with admission control). Otherwise, if the
+  // query has no limit or the query is finished executing, the current consumption
+  // (tracked by its query mem tracker) is used.
+  1: required i64 mem_reserved;
+
+  // The per host mem admitted only for the queries admitted locally.
+  2: required i64 mem_admitted;
+
+  // The per host number of queries admitted only for the queries admitted locally.
+  3: required i64 num_admitted;
+
+  // The per host number of slots in use for the queries admitted locally.
+  4: required i64 slots_in_use;
+}
+
+struct TPerHostStatsUpdateElement {
+    1: required string host_addr;
+    2: required THostStats stats;
+}
+
+struct TPerHostStatsUpdate {
+  // This stores per-host statistics which are used during admission and by HTTP
+  // handlers to query admission control statistics for currently registered backends.
+  1: required list<TPerHostStatsUpdateElement> per_host_stats;
+}
+
 // Description of a single entry in a topic
 struct TTopicItem {
   // Human-readable topic entry identifier
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index c5f600e..2805acc 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2762,11 +2762,11 @@
     "key": "cluster-membership.backends.total"
   },
   {
-  "description": "Total number of queries running on executor group: $0",
+  "description": "Total number of queries admitted on this coordinator running on executor group: $0",
   "contexts": [
     "IMPALAD"
   ],
-  "label": "Total number of queries running on executor group: $0",
+  "label": "Total number of queries admitted on this coordinator running on executor group: $0",
   "units": "UNIT",
   "kind": "GAUGE",
   "key": "admission-controller.executor-group.num-queries-executing.$0"
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 448a591..6a831bb 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -167,7 +167,7 @@
     kwargs = {
       "cluster_size": cluster_size,
       "num_coordinators": num_coordinators,
-      "expected_num_executors": cluster_size,
+      "expected_num_impalads": cluster_size,
       "default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS),
       "use_exclusive_coordinators": use_exclusive_coordinators
     }
@@ -233,7 +233,7 @@
                             use_exclusive_coordinators=False,
                             add_executors=False,
                             log_level=1,
-                            expected_num_executors=DEFAULT_CLUSTER_SIZE,
+                            expected_num_impalads=DEFAULT_CLUSTER_SIZE,
                             expected_subscribers=0,
                             default_query_options=None,
                             statestored_timeout_s=60,
@@ -285,12 +285,12 @@
     # The number of statestore subscribers is
     # cluster_size (# of impalad) + 1 (for catalogd).
     if expected_subscribers == 0:
-      expected_subscribers = expected_num_executors + 1
+      expected_subscribers = expected_num_impalads + 1
       if "--enable_admission_service" in options:
         expected_subscribers += 1
 
     statestored.service.wait_for_live_subscribers(expected_subscribers,
                                                   timeout=statestored_timeout_s)
     for impalad in cls.cluster.impalads:
-      impalad.service.wait_for_num_known_live_backends(expected_num_executors,
+      impalad.service.wait_for_num_known_live_backends(expected_num_impalads,
                                                        timeout=impalad_timeout_s)
diff --git a/tests/custom_cluster/test_catalog_wait.py b/tests/custom_cluster/test_catalog_wait.py
index 34b366a..11b90d1 100644
--- a/tests/custom_cluster/test_catalog_wait.py
+++ b/tests/custom_cluster/test_catalog_wait.py
@@ -49,7 +49,7 @@
 
     # On startup, expect only two executors to be registered.
     self._start_impala_cluster(["--catalog_init_delays=0,0,200000"],
-                               expected_num_executors=2,
+                               expected_num_impalads=2,
                                expected_subscribers=4)
 
     # Expect that impalad[2] is not ready.
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index df21193..750ff0a 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -43,7 +43,7 @@
     method.func_dict["cluster_size"] = 1
     method.func_dict["num_exclusive_coordinators"] = 1
     self.num_groups = 1
-    self.num_executors = 1
+    self.num_impalads = 1
     super(TestExecutorGroups, self).setup_method(method)
     self.coordinator = self.cluster.impalads[0]
 
@@ -53,7 +53,7 @@
     return "default-pool-%s" % name
 
   def _add_executor_group(self, name_suffix, min_size, num_executors=0,
-                          admission_control_slots=0):
+                          admission_control_slots=0, extra_args=None):
     """Adds an executor group to the cluster. 'min_size' specifies the minimum size for
     the new group to be considered healthy. 'num_executors' specifies the number of
     executors to start and defaults to 'min_size' but can be different from 'min_size' to
@@ -63,7 +63,7 @@
     self.num_groups += 1
     if num_executors == 0:
       num_executors = min_size
-    self.num_executors += num_executors
+    self.num_impalads += num_executors
     name = self._group_name(name_suffix)
     LOG.info("Adding %s executors to group %s with minimum size %s" %
              (num_executors, name, min_size))
@@ -71,11 +71,29 @@
                     admission_control_slots]
     if len(name_suffix) > 0:
       cluster_args.append("--impalad_args=-executor_groups=%s:%s" % (name, min_size))
+    if extra_args:
+      cluster_args.append("--impalad_args=%s" % extra_args)
     self._start_impala_cluster(options=cluster_args,
                                cluster_size=num_executors,
                                num_coordinators=0,
                                add_executors=True,
-                               expected_num_executors=self.num_executors)
+                               expected_num_impalads=self.num_impalads)
+
+  def _restart_coordinators(self, num_coordinators, extra_args=None):
+    """Restarts the coordinator spawned in setup_method and enables the caller to start
+    more than one coordinator by specifying 'num_coordinators'"""
+    LOG.info("Adding a coordinator")
+    cluster_args = ["--impalad_args=-executor_groups=coordinator"]
+    if extra_args:
+      cluster_args.append("--impalad_args=%s" % extra_args)
+    self._start_impala_cluster(options=cluster_args,
+                               cluster_size=num_coordinators,
+                               num_coordinators=num_coordinators,
+                               add_executors=False,
+                               expected_num_impalads=num_coordinators,
+                               use_exclusive_coordinators=True)
+    self.coordinator = self.cluster.impalads[0]
+    self.num_impalads = 2
 
   def _get_total_admitted_queries(self):
     """Returns the total number of queries that have been admitted to the default resource
@@ -513,3 +531,56 @@
     self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
                                                    timeout=20)
     assert_hash_join()
+
+  @pytest.mark.execute_serially
+  def test_admission_control_with_multiple_coords(self):
+    """This test verifies that host level metrics like the num of admission slots used
+    and memory admitted is disseminated correctly across the cluster and accounted for
+    while making admission decisions. We run a query that takes up all of a particular
+    resource (slots or memory) and check if attempting to run a query on the other
+    coordinator results in queuing."""
+    # A long running query that runs on every executor
+    QUERY = "select * from functional_parquet.alltypes \
+                 where month < 3 and id + random() < sleep(100);"
+    # default_pool_mem_limit is set to enable mem based admission.
+    self._restart_coordinators(num_coordinators=2,
+                               extra_args="-default_pool_mem_limit=100g")
+    # Create fresh clients
+    second_coord_client = self.create_client_for_nth_impalad(1)
+    self.create_impala_clients()
+    # Add an exec group with a 4gb mem_limit.
+    self._add_executor_group("group1", 2, admission_control_slots=2,
+                             extra_args="-mem_limit=4g")
+    assert self._get_num_executor_groups(only_healthy=True) == 1
+    second_coord_client.set_configuration({'mt_dop': '2'})
+    handle_for_second = second_coord_client.execute_async(QUERY)
+    # Verify that the first coordinator knows about the query running on the second
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.agg-num-running.default-pool", 1, timeout=30)
+    handle_for_first = self.execute_query_async(TEST_QUERY)
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.local-num-queued.default-pool", 1, timeout=30)
+    profile = self.client.get_runtime_profile(handle_for_first)
+    assert "queue reason: Not enough admission control slots available on host" in \
+           profile, profile
+    self.close_query(handle_for_first)
+    second_coord_client.close_query(handle_for_second)
+    # Wait for first coordinator to get the admission update.
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.agg-num-running.default-pool", 0, timeout=30)
+    # Now verify that mem based admission also works as intended. A max of mem_reserved
+    # and mem_admitted is used for this. Since mem_limit is being used here, both will be
+    # identical but this will at least test that code path as a sanity check.
+    second_coord_client.clear_configuration()
+    second_coord_client.set_configuration({'mem_limit': '4g'})
+    handle_for_second = second_coord_client.execute_async(QUERY)
+    # Verify that the first coordinator knows about the query running on the second
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.agg-num-running.default-pool", 1, timeout=30)
+    handle_for_first = self.execute_query_async(TEST_QUERY)
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.local-num-queued.default-pool", 1, timeout=30)
+    profile = self.client.get_runtime_profile(handle_for_first)
+    assert "queue reason: Not enough memory available on host" in profile, profile
+    self.close_query(handle_for_first)
+    second_coord_client.close_query(handle_for_second)
diff --git a/tests/custom_cluster/test_runtime_profile.py b/tests/custom_cluster/test_runtime_profile.py
index 933ad7c..7be2166 100644
--- a/tests/custom_cluster/test_runtime_profile.py
+++ b/tests/custom_cluster/test_runtime_profile.py
@@ -44,5 +44,5 @@
                                cluster_size=1,
                                num_coordinators=0,
                                add_executors=True,
-                               expected_num_executors=4)
+                               expected_num_impalads=4)
     self.run_test_case('runtime-profile-aggregated', vector)
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 4c0f6b3..83d7410 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -213,7 +213,7 @@
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs))
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -247,7 +247,7 @@
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs))
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -281,7 +281,7 @@
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
       '--impalad_args=--allow_spill_to_hdfs=true'],
       cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -313,7 +313,7 @@
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
       '--impalad_args=--allow_spill_to_hdfs=true'],
       cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -347,7 +347,7 @@
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
       '--impalad_args=--allow_spill_to_hdfs=true'],
       cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -380,7 +380,7 @@
       '--impalad_args=--allow_spill_to_hdfs=true',
       '--impalad_args=--remote_tmp_file_size=8M',
       '--impalad_args=--remote_tmp_file_block_size=1m'],
-      cluster_size=1, expected_num_executors=1)
+      cluster_size=1, expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -413,7 +413,7 @@
       '--impalad_args=--allow_spill_to_hdfs=true',
       '--impalad_args=--remote_tmp_file_size=8M',
       '--impalad_args=--remote_tmp_file_block_size=1m'],
-      cluster_size=num, num_coordinators=num, expected_num_executors=num)
+      cluster_size=num, num_coordinators=num, expected_num_impalads=num)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit