IMPALA-8762: Track host level admission stats across all coordinators
This patch adds the ability to share the per-host stats for locally
admitted queries across all coordinators. This helps to get a more
consolidated view of the cluster for stats like slots_in_use and
mem_admitted when making local admission decisions.
Testing:
Added e2e py test
Change-Id: I2946832e0a89b077d0f3bec755e4672be2088243
Reviewed-on: http://gerrit.cloudera.org:8080/17683
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 1d1f544..da79c07 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -71,12 +71,17 @@
return PrettyPrinter::Print(value, TUnit::BYTES);
}
-// Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>".
-// "!" is used because the backend id contains a colon, but it should not contain "!".
-// When parsing the topic key we need to be careful to find the last instance in
-// case the pool name contains it as well.
+// Delimiter used for pool topic keys of the form
+// "<prefix><pool_name><delimiter><backend_id>". "!" is used because the backend id
+// contains a colon, but it should not contain "!". When parsing the topic key we need to
+// be careful to find the last instance in case the pool name contains it as well.
const char TOPIC_KEY_DELIMITER = '!';
+// Prefix used by topic keys for pool stat updates.
+const string TOPIC_KEY_POOL_PREFIX = "POOL:";
+// Prefix used by topic keys for PerHostStat updates.
+const string TOPIC_KEY_STAT_PREFIX = "STAT:";
+
// Delimiter used for the resource pool prefix of executor groups. In order to be used for
// queries in "resource-pool-A", an executor group name must start with
// "resource-pool-A-".
@@ -236,21 +241,30 @@
"host $0. Needed $1 slots but $2/$3 are already "
"in use.";
-// Parses the pool name and backend_id from the topic key if it is valid.
-// Returns true if the topic key is valid and pool_name and backend_id are set.
-static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
- string* backend_id) {
- // Topic keys will look something like: poolname!hostname:22000
- // The '!' delimiter should always be present, the pool name must be
- // at least 1 character, and network address must be at least 3 characters (including
- // ':' and if the hostname and port are each only 1 character). Then the topic_key must
- // be at least 5 characters (1 + 1 + 3).
- const int MIN_TOPIC_KEY_SIZE = 5;
+// Parses the topic key to separate the prefix that helps recognize the kind of update
+// received.
+static inline bool ParseTopicKey(
+ const string& topic_key, string* prefix, string* suffix) {
+ // The prefix should always be present and the network address must be at least 3
+ // characters (including ':' and if the hostname and port are each only 1 character).
+ // Then the topic_key must be at least 8 characters (5 + 3).
+ const int MIN_TOPIC_KEY_SIZE = 8;
if (topic_key.length() < MIN_TOPIC_KEY_SIZE) {
VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
return false;
}
+ DCHECK_EQ(TOPIC_KEY_POOL_PREFIX.size(), TOPIC_KEY_STAT_PREFIX.size())
+ << "All admission topic key prefixes should be of the same size";
+ *prefix = topic_key.substr(0, TOPIC_KEY_POOL_PREFIX.size());
+ *suffix = topic_key.substr(TOPIC_KEY_POOL_PREFIX.size());
+ return true;
+}
+// Parses the pool name and backend_id from the topic key if it is valid.
+// Returns true if the topic key is valid and pool_name and backend_id are set.
+static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
+ string* backend_id) {
+ // Pool topic keys will look something like: poolname!hostname:22000
size_t pos = topic_key.find_last_of(TOPIC_KEY_DELIMITER);
if (pos == string::npos || pos >= topic_key.size() - 1) {
VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
@@ -851,16 +865,25 @@
const NetworkAddressPB& host = entry.first;
const string host_id = NetworkAddressPBToString(host);
int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit();
- const HostStats& host_stats = host_stats_[host_id];
+ const THostStats& host_stats = host_stats_[host_id];
int64_t mem_reserved = host_stats.mem_reserved;
- int64_t mem_admitted = host_stats.mem_admitted;
+ int64_t agg_mem_admitted_on_host = host_stats.mem_admitted;
+ // Aggregate the mem admitted across all queries admitted by other coordinators.
+ for (const auto& remote_entry : remote_per_host_stats_) {
+ auto remote_stat_itr = remote_entry.second.find(host_id);
+ if (remote_stat_itr != remote_entry.second.end()) {
+ agg_mem_admitted_on_host += remote_stat_itr->second.mem_admitted;
+ }
+ }
int64_t mem_to_admit = GetMemToAdmit(state, entry.second);
VLOG_ROW << "Checking memory on host=" << host_id
<< " mem_reserved=" << PrintBytes(mem_reserved)
- << " mem_admitted=" << PrintBytes(mem_admitted)
+ << " mem_admitted=" << PrintBytes(host_stats.mem_admitted)
+ << " agg_mem_admitted_on_host=" << PrintBytes(agg_mem_admitted_on_host)
<< " needs=" << PrintBytes(mem_to_admit)
<< " admit_mem_limit=" << PrintBytes(admit_mem_limit);
- int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted);
+ int64_t effective_host_mem_reserved =
+ std::max(mem_reserved, agg_mem_admitted_on_host);
if (effective_host_mem_reserved + mem_to_admit > admit_mem_limit) {
*mem_unavailable_reason =
Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(mem_to_admit),
@@ -894,14 +917,23 @@
const NetworkAddressPB& host = entry.first;
const string host_id = NetworkAddressPBToString(host);
int64_t admission_slots = entry.second.be_desc.admission_slots();
- int64_t slots_in_use = host_stats_[host_id].slots_in_use;
+ int64_t agg_slots_in_use_on_host = host_stats_[host_id].slots_in_use;
+ // Aggregate num of slots in use across all queries admitted by other coordinators.
+ for (const auto& remote_entry : remote_per_host_stats_) {
+ auto remote_stat_itr = remote_entry.second.find(host_id);
+ if (remote_stat_itr != remote_entry.second.end()) {
+ agg_slots_in_use_on_host += remote_stat_itr->second.slots_in_use;
+ }
+ }
VLOG_ROW << "Checking available slot on host=" << host_id
- << " slots_in_use=" << slots_in_use
- << " needs=" << slots_in_use + entry.second.exec_params->slots_to_use()
+ << " slots_in_use=" << agg_slots_in_use_on_host << " needs="
+ << agg_slots_in_use_on_host + entry.second.exec_params->slots_to_use()
<< " executor admission_slots=" << admission_slots;
- if (slots_in_use + entry.second.exec_params->slots_to_use() > admission_slots) {
+ if (agg_slots_in_use_on_host + entry.second.exec_params->slots_to_use()
+ > admission_slots) {
*unavailable_reason = Substitute(HOST_SLOT_NOT_AVAILABLE, host_id,
- entry.second.exec_params->slots_to_use(), slots_in_use, admission_slots);
+ entry.second.exec_params->slots_to_use(), agg_slots_in_use_on_host,
+ admission_slots);
if (entry.second.be_desc.is_coordinator()) {
coordinator_resource_limited = true;
}
@@ -1544,7 +1576,7 @@
vector<TTopicDelta>* subscriber_topic_updates) {
{
lock_guard<mutex> lock(admission_ctrl_lock_);
- AddPoolUpdates(subscriber_topic_updates);
+ AddPoolAndPerHostStatsUpdates(subscriber_topic_updates);
StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC);
@@ -1590,26 +1622,54 @@
}
void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_updates) {
- for (const TTopicItem& item: topic_updates) {
- string pool_name;
- string topic_backend_id;
- if (!ParsePoolTopicKey(item.key, &pool_name, &topic_backend_id)) continue;
- // The topic entry from this subscriber is handled specially; the stats coming
- // from the statestore are likely already outdated.
- if (topic_backend_id == host_id_) continue;
- if (item.deleted) {
- GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
- continue;
+ string topic_key_prefix;
+ string topic_key_suffix;
+ string pool_name;
+ string topic_backend_id;
+ for (const TTopicItem& item : topic_updates) {
+ if (!ParseTopicKey(item.key, &topic_key_prefix, &topic_key_suffix)) continue;
+ if (topic_key_prefix == TOPIC_KEY_POOL_PREFIX) {
+ if (!ParsePoolTopicKey(topic_key_suffix, &pool_name, &topic_backend_id)) continue;
+ // The topic entry from this subscriber is handled specially; the stats coming
+ // from the statestore are likely already outdated.
+ if (topic_backend_id == host_id_) continue;
+ if (item.deleted) {
+ GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+ continue;
+ }
+ TPoolStats remote_update;
+ uint32_t len = item.value.size();
+ Status status =
+ DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(item.value.data()), &len,
+ false, &remote_update);
+ if (!status.ok()) {
+ VLOG_QUERY << "Error deserializing pool update with key: " << item.key;
+ continue;
+ }
+ GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, &remote_update);
+ } else if (topic_key_prefix == TOPIC_KEY_STAT_PREFIX) {
+ topic_backend_id = topic_key_suffix;
+ if (topic_backend_id == host_id_) continue;
+ if (item.deleted) {
+ remote_per_host_stats_.erase(topic_backend_id);
+ continue;
+ }
+ TPerHostStatsUpdate remote_update;
+ uint32_t len = item.value.size();
+ Status status =
+ DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(item.value.data()), &len,
+ false, &remote_update);
+ if (!status.ok()) {
+ VLOG_QUERY << "Error deserializing stats update with key: " << item.key;
+ continue;
+ }
+ PerHostStats& stats = remote_per_host_stats_[topic_backend_id];
+ for(const auto& elem: remote_update.per_host_stats) {
+ stats[elem.host_addr] = elem.stats;
+ }
+ } else {
+ VLOG_QUERY << "Invalid topic key prefix: " << topic_key_prefix;
}
- TPoolStats remote_update;
- uint32_t len = item.value.size();
- Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
- item.value.data()), &len, false, &remote_update);
- if (!status.ok()) {
- VLOG_QUERY << "Error deserializing pool update with key: " << item.key;
- continue;
- }
- GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, &remote_update);
}
}
@@ -1873,13 +1933,17 @@
metrics_.local_backend_mem_usage->SetValue(current_usage);
}
-void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
+void AdmissionController::AddPoolAndPerHostStatsUpdates(
+ vector<TTopicDelta>* topic_updates) {
// local_stats_ are updated eagerly except for backend_mem_reserved (which isn't used
// for local admission control decisions). Update that now before sending local_stats_.
for (auto& entry : pool_stats_) {
entry.second.UpdateMemTrackerStats();
}
- if (pools_for_updates_.empty()) return;
+ if (pools_for_updates_.empty()) {
+ // No pool updates means no changes to host stats as well, so just return.
+ return;
+ }
topic_updates->push_back(TTopicDelta());
TTopicDelta& topic_delta = topic_updates->back();
topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
@@ -1894,10 +1958,28 @@
&topic_item.value);
if (!status.ok()) {
LOG(WARNING) << "Failed to serialize query pool stats: " << status.GetDetail();
- topic_updates->pop_back();
+ topic_delta.topic_entries.pop_back();
}
}
pools_for_updates_.clear();
+
+ // Now add the host stats
+ topic_delta.topic_entries.push_back(TTopicItem());
+ TTopicItem& topic_item = topic_delta.topic_entries.back();
+ topic_item.key = Substitute("$0$1", TOPIC_KEY_STAT_PREFIX, host_id_);
+ TPerHostStatsUpdate update;
+ for (const auto& elem : host_stats_) {
+ update.per_host_stats.emplace_back();
+ TPerHostStatsUpdateElement& inserted_elem = update.per_host_stats.back();
+ inserted_elem.__set_host_addr(elem.first);
+ inserted_elem.__set_stats(elem.second);
+ }
+ Status status =
+ thrift_serializer_.SerializeToString(&update, &topic_item.value);
+ if (!status.ok()) {
+ LOG(WARNING) << "Failed to serialize host stats: " << status.GetDetail();
+ topic_delta.topic_entries.pop_back();
+ }
}
void AdmissionController::DequeueLoop() {
@@ -2362,7 +2444,8 @@
// Ensure the backend_id does not contain the delimiter to ensure that the topic key
// can be parsed properly by finding the last instance of the delimiter.
DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
- return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
+ return Substitute(
+ "$0$1$2$3", TOPIC_KEY_POOL_PREFIX, pool_name, TOPIC_KEY_DELIMITER, backend_id);
}
vector<const ExecutorGroup*> AdmissionController::GetExecutorGroupsForQuery(
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index fb8bca2..e52a769 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -437,23 +437,10 @@
/// Calls ResetInformationalStats on all pools.
void ResetAllPoolInformationalStats();
- // This struct stores per-host statistics which are used during admission and by HTTP
- // handlers to query admission control statistics for currently registered backends.
- struct HostStats {
- /// The mem reserved for a query that is currently executing is its memory limit, if
- /// set (which should be the common case with admission control). Otherwise, if the
- /// query has no limit or the query is finished executing, the current consumption
- /// (tracked by its query mem tracker) is used.
- int64_t mem_reserved = 0;
- /// The per host mem admitted only for the queries admitted locally.
- int64_t mem_admitted = 0;
- /// The per host number of queries admitted only for the queries admitted locally.
- int64_t num_admitted = 0;
- /// The per host number of slots in use for the queries admitted locally.
- int64_t slots_in_use = 0;
- };
-
- typedef std::unordered_map<std::string, HostStats> PerHostStats;
+ // This maps a backends's id(host/port id) to its host level statistics which are used
+ // during admission and by HTTP handlers to query admission control statistics for
+ // currently registered backends.
+ typedef std::unordered_map<std::string, THostStats> PerHostStats;
// Populates the input map with the per host memory reserved and admitted in the
// following format: <host_address_str, pair<mem_reserved, mem_admitted>>.
@@ -510,6 +497,11 @@
PerHostStats host_stats_;
+ /// A map from other coordinator's host_id (host/port id) -> their view of the
+ /// PerHostStats. Used to get a full view of the cluster state while making admission
+ /// decisions. Updated via statestore updates.
+ std::unordered_map<std::string, PerHostStats> remote_per_host_stats_;
+
/// Counter of the number of times dequeuing a query failed because of a resource
/// issue on the coordinator (which therefore cannot be resolved by adding more
/// executor groups).
@@ -908,9 +900,10 @@
std::vector<TTopicDelta>* subscriber_topic_updates);
/// Adds outgoing topic updates to subscriber_topic_updates for pools that have changed
- /// since the last call to AddPoolUpdates(). Called by UpdatePoolStats() before
- /// UpdateClusterAggregates(). Must hold admission_ctrl_lock_.
- void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
+ /// since the last call to AddPoolUpdates(). Also adds the complete local view of
+ /// per-host statistics. Called by UpdatePoolStats() before UpdateClusterAggregates().
+ /// Must hold admission_ctrl_lock_.
+ void AddPoolAndPerHostStatsUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
/// Updates the remote stats with per-host topic_updates coming from the statestore.
/// Removes remote stats identified by topic deletions coming from the
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 41dedb4..71eb579 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -74,6 +74,34 @@
8: required i64 num_running;
}
+struct THostStats {
+ // The mem reserved for a query that is currently executing is its memory limit, if
+ // set (which should be the common case with admission control). Otherwise, if the
+ // query has no limit or the query is finished executing, the current consumption
+ // (tracked by its query mem tracker) is used.
+ 1: required i64 mem_reserved;
+
+ // The per host mem admitted only for the queries admitted locally.
+ 2: required i64 mem_admitted;
+
+ // The per host number of queries admitted only for the queries admitted locally.
+ 3: required i64 num_admitted;
+
+ // The per host number of slots in use for the queries admitted locally.
+ 4: required i64 slots_in_use;
+}
+
+struct TPerHostStatsUpdateElement {
+ 1: required string host_addr;
+ 2: required THostStats stats;
+}
+
+struct TPerHostStatsUpdate {
+ // This stores per-host statistics which are used during admission and by HTTP
+ // handlers to query admission control statistics for currently registered backends.
+ 1: required list<TPerHostStatsUpdateElement> per_host_stats;
+}
+
// Description of a single entry in a topic
struct TTopicItem {
// Human-readable topic entry identifier
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index c5f600e..2805acc 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2762,11 +2762,11 @@
"key": "cluster-membership.backends.total"
},
{
- "description": "Total number of queries running on executor group: $0",
+ "description": "Total number of queries admitted on this coordinator running on executor group: $0",
"contexts": [
"IMPALAD"
],
- "label": "Total number of queries running on executor group: $0",
+ "label": "Total number of queries admitted on this coordinator running on executor group: $0",
"units": "UNIT",
"kind": "GAUGE",
"key": "admission-controller.executor-group.num-queries-executing.$0"
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 448a591..6a831bb 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -167,7 +167,7 @@
kwargs = {
"cluster_size": cluster_size,
"num_coordinators": num_coordinators,
- "expected_num_executors": cluster_size,
+ "expected_num_impalads": cluster_size,
"default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS),
"use_exclusive_coordinators": use_exclusive_coordinators
}
@@ -233,7 +233,7 @@
use_exclusive_coordinators=False,
add_executors=False,
log_level=1,
- expected_num_executors=DEFAULT_CLUSTER_SIZE,
+ expected_num_impalads=DEFAULT_CLUSTER_SIZE,
expected_subscribers=0,
default_query_options=None,
statestored_timeout_s=60,
@@ -285,12 +285,12 @@
# The number of statestore subscribers is
# cluster_size (# of impalad) + 1 (for catalogd).
if expected_subscribers == 0:
- expected_subscribers = expected_num_executors + 1
+ expected_subscribers = expected_num_impalads + 1
if "--enable_admission_service" in options:
expected_subscribers += 1
statestored.service.wait_for_live_subscribers(expected_subscribers,
timeout=statestored_timeout_s)
for impalad in cls.cluster.impalads:
- impalad.service.wait_for_num_known_live_backends(expected_num_executors,
+ impalad.service.wait_for_num_known_live_backends(expected_num_impalads,
timeout=impalad_timeout_s)
diff --git a/tests/custom_cluster/test_catalog_wait.py b/tests/custom_cluster/test_catalog_wait.py
index 34b366a..11b90d1 100644
--- a/tests/custom_cluster/test_catalog_wait.py
+++ b/tests/custom_cluster/test_catalog_wait.py
@@ -49,7 +49,7 @@
# On startup, expect only two executors to be registered.
self._start_impala_cluster(["--catalog_init_delays=0,0,200000"],
- expected_num_executors=2,
+ expected_num_impalads=2,
expected_subscribers=4)
# Expect that impalad[2] is not ready.
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index df21193..750ff0a 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -43,7 +43,7 @@
method.func_dict["cluster_size"] = 1
method.func_dict["num_exclusive_coordinators"] = 1
self.num_groups = 1
- self.num_executors = 1
+ self.num_impalads = 1
super(TestExecutorGroups, self).setup_method(method)
self.coordinator = self.cluster.impalads[0]
@@ -53,7 +53,7 @@
return "default-pool-%s" % name
def _add_executor_group(self, name_suffix, min_size, num_executors=0,
- admission_control_slots=0):
+ admission_control_slots=0, extra_args=None):
"""Adds an executor group to the cluster. 'min_size' specifies the minimum size for
the new group to be considered healthy. 'num_executors' specifies the number of
executors to start and defaults to 'min_size' but can be different from 'min_size' to
@@ -63,7 +63,7 @@
self.num_groups += 1
if num_executors == 0:
num_executors = min_size
- self.num_executors += num_executors
+ self.num_impalads += num_executors
name = self._group_name(name_suffix)
LOG.info("Adding %s executors to group %s with minimum size %s" %
(num_executors, name, min_size))
@@ -71,11 +71,29 @@
admission_control_slots]
if len(name_suffix) > 0:
cluster_args.append("--impalad_args=-executor_groups=%s:%s" % (name, min_size))
+ if extra_args:
+ cluster_args.append("--impalad_args=%s" % extra_args)
self._start_impala_cluster(options=cluster_args,
cluster_size=num_executors,
num_coordinators=0,
add_executors=True,
- expected_num_executors=self.num_executors)
+ expected_num_impalads=self.num_impalads)
+
+ def _restart_coordinators(self, num_coordinators, extra_args=None):
+ """Restarts the coordinator spawned in setup_method and enables the caller to start
+ more than one coordinator by specifying 'num_coordinators'"""
+ LOG.info("Adding a coordinator")
+ cluster_args = ["--impalad_args=-executor_groups=coordinator"]
+ if extra_args:
+ cluster_args.append("--impalad_args=%s" % extra_args)
+ self._start_impala_cluster(options=cluster_args,
+ cluster_size=num_coordinators,
+ num_coordinators=num_coordinators,
+ add_executors=False,
+ expected_num_impalads=num_coordinators,
+ use_exclusive_coordinators=True)
+ self.coordinator = self.cluster.impalads[0]
+ self.num_impalads = 2
def _get_total_admitted_queries(self):
"""Returns the total number of queries that have been admitted to the default resource
@@ -513,3 +531,56 @@
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
timeout=20)
assert_hash_join()
+
+ @pytest.mark.execute_serially
+ def test_admission_control_with_multiple_coords(self):
+ """This test verifies that host level metrics like the num of admission slots used
+ and memory admitted is disseminated correctly across the cluster and accounted for
+ while making admission decisions. We run a query that takes up all of a particular
+ resource (slots or memory) and check if attempting to run a query on the other
+ coordinator results in queuing."""
+ # A long running query that runs on every executor
+ QUERY = "select * from functional_parquet.alltypes \
+ where month < 3 and id + random() < sleep(100);"
+ # default_pool_mem_limit is set to enable mem based admission.
+ self._restart_coordinators(num_coordinators=2,
+ extra_args="-default_pool_mem_limit=100g")
+ # Create fresh clients
+ second_coord_client = self.create_client_for_nth_impalad(1)
+ self.create_impala_clients()
+ # Add an exec group with a 4gb mem_limit.
+ self._add_executor_group("group1", 2, admission_control_slots=2,
+ extra_args="-mem_limit=4g")
+ assert self._get_num_executor_groups(only_healthy=True) == 1
+ second_coord_client.set_configuration({'mt_dop': '2'})
+ handle_for_second = second_coord_client.execute_async(QUERY)
+ # Verify that the first coordinator knows about the query running on the second
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.agg-num-running.default-pool", 1, timeout=30)
+ handle_for_first = self.execute_query_async(TEST_QUERY)
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.local-num-queued.default-pool", 1, timeout=30)
+ profile = self.client.get_runtime_profile(handle_for_first)
+ assert "queue reason: Not enough admission control slots available on host" in \
+ profile, profile
+ self.close_query(handle_for_first)
+ second_coord_client.close_query(handle_for_second)
+ # Wait for first coordinator to get the admission update.
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.agg-num-running.default-pool", 0, timeout=30)
+ # Now verify that mem based admission also works as intended. A max of mem_reserved
+ # and mem_admitted is used for this. Since mem_limit is being used here, both will be
+ # identical but this will at least test that code path as a sanity check.
+ second_coord_client.clear_configuration()
+ second_coord_client.set_configuration({'mem_limit': '4g'})
+ handle_for_second = second_coord_client.execute_async(QUERY)
+ # Verify that the first coordinator knows about the query running on the second
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.agg-num-running.default-pool", 1, timeout=30)
+ handle_for_first = self.execute_query_async(TEST_QUERY)
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.local-num-queued.default-pool", 1, timeout=30)
+ profile = self.client.get_runtime_profile(handle_for_first)
+ assert "queue reason: Not enough memory available on host" in profile, profile
+ self.close_query(handle_for_first)
+ second_coord_client.close_query(handle_for_second)
diff --git a/tests/custom_cluster/test_runtime_profile.py b/tests/custom_cluster/test_runtime_profile.py
index 933ad7c..7be2166 100644
--- a/tests/custom_cluster/test_runtime_profile.py
+++ b/tests/custom_cluster/test_runtime_profile.py
@@ -44,5 +44,5 @@
cluster_size=1,
num_coordinators=0,
add_executors=True,
- expected_num_executors=4)
+ expected_num_impalads=4)
self.run_test_case('runtime-profile-aggregated', vector)
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 4c0f6b3..83d7410 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -213,7 +213,7 @@
self._start_impala_cluster([
'--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
'--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], cluster_size=1,
- expected_num_executors=1)
+ expected_num_impalads=1)
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
expected_count=len(normal_dirs))
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -247,7 +247,7 @@
self._start_impala_cluster([
'--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
'--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], cluster_size=1,
- expected_num_executors=1)
+ expected_num_impalads=1)
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
expected_count=len(normal_dirs))
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -281,7 +281,7 @@
'--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
'--impalad_args=--allow_spill_to_hdfs=true'],
cluster_size=1,
- expected_num_executors=1)
+ expected_num_impalads=1)
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
expected_count=len(normal_dirs) - 1)
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -313,7 +313,7 @@
'--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
'--impalad_args=--allow_spill_to_hdfs=true'],
cluster_size=1,
- expected_num_executors=1)
+ expected_num_impalads=1)
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
expected_count=len(normal_dirs) - 1)
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -347,7 +347,7 @@
'--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
'--impalad_args=--allow_spill_to_hdfs=true'],
cluster_size=1,
- expected_num_executors=1)
+ expected_num_impalads=1)
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
expected_count=len(normal_dirs) - 1)
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -380,7 +380,7 @@
'--impalad_args=--allow_spill_to_hdfs=true',
'--impalad_args=--remote_tmp_file_size=8M',
'--impalad_args=--remote_tmp_file_block_size=1m'],
- cluster_size=1, expected_num_executors=1)
+ cluster_size=1, expected_num_impalads=1)
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
expected_count=len(normal_dirs) - 1)
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -413,7 +413,7 @@
'--impalad_args=--allow_spill_to_hdfs=true',
'--impalad_args=--remote_tmp_file_size=8M',
'--impalad_args=--remote_tmp_file_block_size=1m'],
- cluster_size=num, num_coordinators=num, expected_num_executors=num)
+ cluster_size=num, num_coordinators=num, expected_num_impalads=num)
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
expected_count=len(normal_dirs) - 1)
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit