| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "scheduling/admission-controller.h" |
| |
| #include <boost/algorithm/string.hpp> |
| #include <boost/mem_fn.hpp> |
| #include <gutil/strings/stringpiece.h> |
| #include <gutil/strings/substitute.h> |
| |
| #include "common/logging.h" |
| #include "runtime/bufferpool/reservation-util.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/mem-tracker.h" |
| #include "scheduling/cluster-membership-mgr.h" |
| #include "scheduling/executor-group.h" |
| #include "scheduling/query-schedule.h" |
| #include "scheduling/scheduler.h" |
| #include "service/impala-server.h" |
| #include "util/bit-util.h" |
| #include "util/debug-util.h" |
| #include "util/metrics.h" |
| #include "util/pretty-printer.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/time.h" |
| |
| #include "common/names.h" |
| |
| using namespace strings; |
| |
| DEFINE_int64(queue_wait_timeout_ms, 60 * 1000, "Maximum amount of time (in " |
| "milliseconds) that a request will wait to be admitted before timing out."); |
| |
| // The stale topic warning threshold is made configurable to allow suppressing the |
| // error if it turns out to be noisy on some deployments or allow lowering the |
| // threshold to help debug admission control issues. Hidden so that we have the |
| // option of making this a no-op later. |
| DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000, |
| "Threshold above which the admission controller will append warnings to " |
| "error messages and profiles warning that the admission control topic is " |
| "stale so that the admission control decision may have been based on stale " |
| "state data. The default, 5 seconds, is chosen to minimise false positives but " |
| "capture most cases where the Impala daemon is disconnected from the statestore " |
| "or topic updates are seriously delayed."); |
| |
| namespace impala { |
| |
| const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128; |
| const int64_t AdmissionController::PoolStats::HISTOGRAM_BIN_SIZE = 1024L * 1024L * 1024L; |
| const double AdmissionController::PoolStats::EMA_MULTIPLIER = 0.2; |
| |
| /// Convenience method. |
| string PrintBytes(int64_t value) { |
| return PrettyPrinter::Print(value, TUnit::BYTES); |
| } |
| |
| // Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>". |
| // "!" is used because the backend id contains a colon, but it should not contain "!". |
| // When parsing the topic key we need to be careful to find the last instance in |
| // case the pool name contains it as well. |
| const char TOPIC_KEY_DELIMITER = '!'; |
| |
| // Delimiter used for the resource pool prefix of executor groups. In order to be used for |
| // queries in "resource-pool-A", an executor group name must start with |
| // "resource-pool-A-". |
| const char POOL_GROUP_DELIMITER = '-'; |
| |
| const string EXEC_GROUP_QUERY_LOAD_KEY_FORMAT = |
| "admission-controller.executor-group.num-queries-executing.$0"; |
| |
| // Define metric key format strings for metrics in PoolMetrics |
| // '$0' is replaced with the pool name by strings::Substitute |
| const string TOTAL_ADMITTED_METRIC_KEY_FORMAT = |
| "admission-controller.total-admitted.$0"; |
| const string TOTAL_QUEUED_METRIC_KEY_FORMAT = |
| "admission-controller.total-queued.$0"; |
| const string TOTAL_DEQUEUED_METRIC_KEY_FORMAT = |
| "admission-controller.total-dequeued.$0"; |
| const string TOTAL_REJECTED_METRIC_KEY_FORMAT = |
| "admission-controller.total-rejected.$0"; |
| const string TOTAL_TIMED_OUT_METRIC_KEY_FORMAT = |
| "admission-controller.total-timed-out.$0"; |
| const string TOTAL_RELEASED_METRIC_KEY_FORMAT = |
| "admission-controller.total-released.$0"; |
| const string TIME_IN_QUEUE_METRIC_KEY_FORMAT = |
| "admission-controller.time-in-queue-ms.$0"; |
| const string AGG_NUM_RUNNING_METRIC_KEY_FORMAT = |
| "admission-controller.agg-num-running.$0"; |
| const string AGG_NUM_QUEUED_METRIC_KEY_FORMAT = |
| "admission-controller.agg-num-queued.$0"; |
| const string AGG_MEM_RESERVED_METRIC_KEY_FORMAT = |
| "admission-controller.agg-mem-reserved.$0"; |
| const string LOCAL_MEM_ADMITTED_METRIC_KEY_FORMAT = |
| "admission-controller.local-mem-admitted.$0"; |
| const string LOCAL_NUM_ADMITTED_RUNNING_METRIC_KEY_FORMAT = |
| "admission-controller.local-num-admitted-running.$0"; |
| const string LOCAL_NUM_QUEUED_METRIC_KEY_FORMAT = |
| "admission-controller.local-num-queued.$0"; |
| const string LOCAL_BACKEND_MEM_USAGE_METRIC_KEY_FORMAT = |
| "admission-controller.local-backend-mem-usage.$0"; |
| const string LOCAL_BACKEND_MEM_RESERVED_METRIC_KEY_FORMAT = |
| "admission-controller.local-backend-mem-reserved.$0"; |
| const string POOL_MAX_MEM_RESOURCES_METRIC_KEY_FORMAT = |
| "admission-controller.pool-max-mem-resources.$0"; |
| const string POOL_MAX_REQUESTS_METRIC_KEY_FORMAT = |
| "admission-controller.pool-max-requests.$0"; |
| const string POOL_MAX_QUEUED_METRIC_KEY_FORMAT = |
| "admission-controller.pool-max-queued.$0"; |
| const string POOL_QUEUE_TIMEOUT_METRIC_KEY_FORMAT = |
| "admission-controller.pool-queue-timeout.$0"; |
| const string POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT = |
| "admission-controller.pool-max-query-mem-limit.$0"; |
| const string POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT = |
| "admission-controller.pool-min-query-mem-limit.$0"; |
| const string POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT = |
| "admission-controller.pool-clamp-mem-limit-query-option.$0"; |
| const string POOL_MAX_RUNNING_QUERIES_MULTIPLE_METRIC_KEY_FORMAT = |
| "admission-controller.pool-max-running-queries-multiple.$0"; |
| const string POOL_MAX_QUEUED_QUERIES_MULTIPLE_METRIC_KEY_FORMAT = |
| "admission-controller.pool-max-queued-queries-multiple.$0"; |
| const string POOL_MAX_MEMORY_MULTIPLE_METRIC_KEY_FORMAT = |
| "admission-controller.pool-max-memory-multiple.$0"; |
| const string POOL_MAX_RUNNING_QUERIES_DERIVED_METRIC_KEY_FORMAT = |
| "admission-controller.pool-max-running-queries-derived.$0"; |
| const string POOL_MAX_QUEUED_QUERIES_DERIVED_METRIC_KEY_FORMAT = |
| "admission-controller.max-queued-queries-derived.$0"; |
| const string POOL_MAX_MEMORY_DERIVED_METRIC_KEY_FORMAT = |
| "admission-controller.max-memory-derived.$0"; |
| |
| // Profile query events |
| const string QUERY_EVENT_SUBMIT_FOR_ADMISSION = "Submit for admission"; |
| const string QUERY_EVENT_QUEUED = "Queued"; |
| const string QUERY_EVENT_COMPLETED_ADMISSION = "Completed admission"; |
| |
| // Profile info strings |
| const string AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT = "Admission result"; |
| const string AdmissionController::PROFILE_INFO_VAL_ADMIT_IMMEDIATELY = |
| "Admitted immediately"; |
| const string AdmissionController::PROFILE_INFO_VAL_QUEUED = "Queued"; |
| const string AdmissionController::PROFILE_INFO_VAL_CANCELLED_IN_QUEUE = |
| "Cancelled (queued)"; |
| const string AdmissionController::PROFILE_INFO_VAL_ADMIT_QUEUED = "Admitted (queued)"; |
| const string AdmissionController::PROFILE_INFO_VAL_REJECTED = "Rejected"; |
| const string AdmissionController::PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)"; |
| const string AdmissionController::PROFILE_INFO_KEY_INITIAL_QUEUE_REASON = |
| "Initial admission queue reason"; |
| const string AdmissionController::PROFILE_INFO_VAL_INITIAL_QUEUE_REASON = |
| "waited $0 ms, reason: $1"; |
| const string AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON = |
| "Latest admission queue reason"; |
| const string AdmissionController::PROFILE_INFO_KEY_ADMITTED_MEM = |
| "Cluster Memory Admitted"; |
| const string AdmissionController::PROFILE_INFO_KEY_EXECUTOR_GROUP = "Executor Group"; |
| const string AdmissionController::PROFILE_INFO_KEY_STALENESS_WARNING = |
| "Admission control state staleness"; |
| const string AdmissionController::PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME = |
| "AdmissionControlTimeSinceLastUpdate"; |
| |
| // Error status string details |
| const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_FIXED = |
| "Invalid pool config: the min_query_mem_limit $0 is greater than the " |
| "max_mem_resources $1 (configured statically)"; |
| const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_MULTIPLE = |
| "The min_query_mem_limit $0 is greater than the current max_mem_resources $1 ($2); " |
| "queries will not be admitted until more executors are available."; |
| const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_LIMIT = |
| "Invalid pool config: the min_query_mem_limit is greater than the " |
| "max_query_mem_limit ($0 > $1)"; |
| const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION = |
| "minimum memory reservation is greater than memory available to the query for buffer " |
| "reservations. Memory reservation needed given the current plan: $0. Adjust either " |
| "the mem_limit or the pool config (max-query-mem-limit, min-query-mem-limit) for the " |
| "query to allow the query memory limit to be at least $1. Note that changing the " |
| "mem_limit may also change the plan. See the query profile for more information " |
| "about the per-node memory requirements."; |
| const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION = |
| "minimum memory reservation on backend '$0' is greater than memory available to the " |
| "query for buffer reservations. Increase the buffer_pool_limit to $1. See the query " |
| "profile for more information about the per-node memory requirements."; |
| const string REASON_NOT_ENOUGH_SLOTS_ON_BACKEND = |
| "number of admission control slots needed ($0) on backend '$1' is greater than total " |
| "slots available $2. Reduce mt_dop to less than $2 to ensure that the query can " |
| "execute."; |
| const string REASON_MIN_RESERVATION_OVER_POOL_MEM = |
| "minimum memory reservation needed is greater than pool max mem resources. Pool " |
| "max mem resources: $0 ($1). Cluster-wide memory reservation needed: $2. Increase " |
| "the pool max mem resources. See the query profile for more information about the " |
| "per-node memory requirements."; |
| const string REASON_DISABLED_MAX_MEM_RESOURCES = |
| "disabled by pool max mem resources set to 0"; |
| const string REASON_DISABLED_REQUESTS_LIMIT = "disabled by requests limit set to 0"; |
| // $2 is the description of how the queue limit was calculated, $3 is the staleness |
| // detail. |
| const string REASON_QUEUE_FULL = "queue full, limit=$0 ($1), num_queued=$2.$3"; |
| const string REASON_REQ_OVER_POOL_MEM = |
| "request memory needed $0 is greater than pool max mem resources $1 ($2).\n\n" |
| "Use the MEM_LIMIT query option to indicate how much memory is required per node. " |
| "The total memory needed is the per-node MEM_LIMIT times the number of nodes " |
| "executing the query. See the Admission Control documentation for more information."; |
| const string REASON_REQ_OVER_NODE_MEM = |
| "request memory needed $0 is greater than memory available for admission $1 " |
| "of $2.\n\nUse the MEM_LIMIT query option to indicate how much memory is required " |
| "per node."; |
| const string REASON_THREAD_RESERVATION_LIMIT_EXCEEDED = |
| "thread reservation on backend '$0' is greater than the THREAD_RESERVATION_LIMIT " |
| "query option value: $1 > $2."; |
| const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED = |
| "sum of thread reservations across all $0 backends is greater than the " |
| "THREAD_RESERVATION_AGGREGATE_LIMIT query option value: $1 > $2."; |
| // $0 is the error message returned by the scheduler. |
| const string REASON_SCHEDULER_ERROR = "Error during scheduling: $0"; |
| const string REASON_LOCAL_BACKEND_NOT_STARTED = "Local backend has not started up yet."; |
| const string REASON_NO_EXECUTOR_GROUPS = "Waiting for executors to start. Only DDL " |
| "queries can currently run."; |
| |
| // Queue decision details |
| // $0 = num running queries, $1 = num queries limit, $2 = num queries limit explanation, |
| // $3 = staleness detail |
| const string QUEUED_NUM_RUNNING = |
| "number of running queries $0 is at or over limit $1 ($2)$3."; |
| // $0 = queue size, $1 = staleness detail |
| const string QUEUED_QUEUE_NOT_EMPTY = "queue is not empty (size $0); queued queries are " |
| "executed first.$1"; |
| // $0 = pool name, $1 = pool max memory, $2 = pool max memory explanation, |
| // $3 = pool mem needed, $4 = pool mem available, $5 = staleness detail |
| const string POOL_MEM_NOT_AVAILABLE = |
| "Not enough aggregate memory available in pool $0 " |
| "with max mem resources $1 ($2). Needed $3 but only $4 was available.$5"; |
| // $0 = host name, $1 = host mem needed, $3 = host mem available, $4 = staleness detail |
| const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0. " |
| "Needed $1 but only $2 out of $3 was available.$4"; |
| |
| // $0 = host name, $1 = num admitted, $2 = max requests |
| const string HOST_SLOT_NOT_AVAILABLE = "Not enough admission control slots available on " |
| "host $0. Needed $1 slots but $2/$3 are already " |
| "in use."; |
| |
| // Parses the pool name and backend_id from the topic key if it is valid. |
| // Returns true if the topic key is valid and pool_name and backend_id are set. |
| static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name, |
| string* backend_id) { |
| // Topic keys will look something like: poolname!hostname:22000 |
| // The '!' delimiter should always be present, the pool name must be |
| // at least 1 character, and network address must be at least 3 characters (including |
| // ':' and if the hostname and port are each only 1 character). Then the topic_key must |
| // be at least 5 characters (1 + 1 + 3). |
| const int MIN_TOPIC_KEY_SIZE = 5; |
| if (topic_key.length() < MIN_TOPIC_KEY_SIZE) { |
| VLOG_QUERY << "Invalid topic key for pool: " << topic_key; |
| return false; |
| } |
| |
| size_t pos = topic_key.find_last_of(TOPIC_KEY_DELIMITER); |
| if (pos == string::npos || pos >= topic_key.size() - 1) { |
| VLOG_QUERY << "Invalid topic key for pool: " << topic_key; |
| return false; |
| } |
| *pool_name = topic_key.substr(0, pos); |
| *backend_id = topic_key.substr(pos + 1); |
| return true; |
| } |
| |
| // Return a debug string for the pool stats. |
| static string DebugPoolStats(const TPoolStats& stats) { |
| stringstream ss; |
| ss << "num_admitted_running=" << stats.num_admitted_running << ", "; |
| ss << "num_queued=" << stats.num_queued << ", "; |
| ss << "backend_mem_reserved=" << PrintBytes(stats.backend_mem_reserved); |
| return ss.str(); |
| } |
| |
| string AdmissionController::PoolStats::DebugString() const { |
| stringstream ss; |
| ss << "agg_num_running=" << agg_num_running_ << ", "; |
| ss << "agg_num_queued=" << agg_num_queued_ << ", "; |
| ss << "agg_mem_reserved=" << PrintBytes(agg_mem_reserved_) << ", "; |
| ss << " local_host(local_mem_admitted=" << PrintBytes(local_mem_admitted_) << ", "; |
| ss << DebugPoolStats(local_stats_) << ")"; |
| return ss.str(); |
| } |
| |
| // TODO: do we need host_id_ to come from host_addr or can it just take the same id |
| // the Scheduler has (coming from the StatestoreSubscriber)? |
| AdmissionController::AdmissionController(ClusterMembershipMgr* cluster_membership_mgr, |
| StatestoreSubscriber* subscriber, RequestPoolService* request_pool_service, |
| MetricGroup* metrics, const TNetworkAddress& host_addr) |
| : cluster_membership_mgr_(cluster_membership_mgr), |
| subscriber_(subscriber), |
| request_pool_service_(request_pool_service), |
| metrics_group_(metrics->GetOrCreateChildGroup("admission-controller")), |
| host_id_(TNetworkAddressToString(host_addr)), |
| thrift_serializer_(false), |
| done_(false) { |
| cluster_membership_mgr_->RegisterUpdateCallbackFn( |
| [this](ClusterMembershipMgr::SnapshotPtr snapshot) { |
| this->UpdateExecGroupMetricMap(snapshot); |
| }); |
| } |
| |
| AdmissionController::~AdmissionController() { |
| // If the dequeue thread is not running (e.g. if Init() fails), then there is |
| // nothing to do. |
| if (dequeue_thread_ == nullptr) return; |
| |
| // The AdmissionController should live for the lifetime of the impalad, but |
| // for unit tests we need to ensure that no thread is waiting on the |
| // condition variable. This notifies the dequeue thread to stop and waits |
| // for it to finish. |
| { |
| // Lock to ensure the dequeue thread will see the update to done_ |
| lock_guard<mutex> l(admission_ctrl_lock_); |
| done_ = true; |
| pending_dequeue_ = true; |
| dequeue_cv_.NotifyOne(); |
| } |
| dequeue_thread_->Join(); |
| } |
| |
| Status AdmissionController::Init() { |
| RETURN_IF_ERROR(Thread::Create("scheduling", "admission-thread", |
| &AdmissionController::DequeueLoop, this, &dequeue_thread_)); |
| auto cb = [this]( |
| const StatestoreSubscriber::TopicDeltaMap& state, |
| vector<TTopicDelta>* topic_updates) { UpdatePoolStats(state, topic_updates); }; |
| Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, |
| /* is_transient=*/true, /* populate_min_subscriber_topic_version=*/false, |
| /* filter_prefix=*/"", cb); |
| if (!status.ok()) { |
| status.AddDetail("AdmissionController failed to register request queue topic"); |
| } |
| return status; |
| } |
| |
| void AdmissionController::PoolStats::AdmitQueryAndMemory(const QuerySchedule& schedule) { |
| int64_t cluster_mem_admitted = schedule.GetClusterMemoryToAdmit(); |
| DCHECK_GT(cluster_mem_admitted, 0); |
| local_mem_admitted_ += cluster_mem_admitted; |
| metrics_.local_mem_admitted->Increment(cluster_mem_admitted); |
| |
| agg_num_running_ += 1; |
| metrics_.agg_num_running->Increment(1L); |
| |
| local_stats_.num_admitted_running += 1; |
| metrics_.local_num_admitted_running->Increment(1L); |
| |
| metrics_.total_admitted->Increment(1L); |
| } |
| |
| void AdmissionController::PoolStats::ReleaseQuery(int64_t peak_mem_consumption) { |
| // Update stats tracking the number of running and admitted queries. |
| agg_num_running_ -= 1; |
| metrics_.agg_num_running->Increment(-1L); |
| |
| local_stats_.num_admitted_running -= 1; |
| metrics_.local_num_admitted_running->Increment(-1L); |
| |
| metrics_.total_released->Increment(1L); |
| DCHECK_GE(local_stats_.num_admitted_running, 0); |
| DCHECK_GE(agg_num_running_, 0); |
| |
| // Update the 'peak_mem_histogram' based on the given peak memory consumption of the |
| // query. |
| int64_t histogram_bucket = |
| BitUtil::RoundUp(peak_mem_consumption, HISTOGRAM_BIN_SIZE) / HISTOGRAM_BIN_SIZE; |
| histogram_bucket = std::max(std::min(histogram_bucket, HISTOGRAM_NUM_OF_BINS), 1L) - 1; |
| peak_mem_histogram_[histogram_bucket] = ++(peak_mem_histogram_[histogram_bucket]); |
| } |
| |
| void AdmissionController::PoolStats::ReleaseMem(int64_t mem_to_release) { |
| // Update stats tracking memory admitted. |
| DCHECK_GT(mem_to_release, 0); |
| local_mem_admitted_ -= mem_to_release; |
| DCHECK_GE(local_mem_admitted_, 0); |
| metrics_.local_mem_admitted->Increment(-mem_to_release); |
| } |
| |
| void AdmissionController::PoolStats::Queue() { |
| agg_num_queued_ += 1; |
| metrics_.agg_num_queued->Increment(1L); |
| |
| local_stats_.num_queued += 1; |
| metrics_.local_num_queued->Increment(1L); |
| |
| metrics_.total_queued->Increment(1L); |
| } |
| |
| void AdmissionController::PoolStats::Dequeue(bool timed_out) { |
| agg_num_queued_ -= 1; |
| metrics_.agg_num_queued->Increment(-1L); |
| |
| local_stats_.num_queued -= 1; |
| metrics_.local_num_queued->Increment(-1L); |
| |
| DCHECK_GE(agg_num_queued_, 0); |
| DCHECK_GE(local_stats_.num_queued, 0); |
| if (timed_out) { |
| metrics_.total_timed_out->Increment(1L); |
| } else { |
| metrics_.total_dequeued->Increment(1L); |
| } |
| } |
| |
| void AdmissionController::UpdateStatsOnReleaseForBackends( |
| const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addrs) { |
| int64_t total_mem_to_release = 0; |
| for (auto host_addr : host_addrs) { |
| auto backend_exec_params = schedule.per_backend_exec_params().find(host_addr); |
| if (backend_exec_params == schedule.per_backend_exec_params().end()) { |
| string err_msg = |
| strings::Substitute("Error: Cannot find exec params of host $0 for query $1.", |
| PrintThrift(host_addr), PrintId(schedule.query_id())); |
| DCHECK(false) << err_msg; |
| LOG(ERROR) << err_msg; |
| continue; |
| } |
| int64_t mem_to_release = GetMemToAdmit(schedule, backend_exec_params->second); |
| UpdateHostStats( |
| host_addr, -mem_to_release, -1, -backend_exec_params->second.slots_to_use); |
| total_mem_to_release += mem_to_release; |
| } |
| PoolStats* pool_stats = GetPoolStats(schedule); |
| pool_stats->ReleaseMem(total_mem_to_release); |
| pools_for_updates_.insert(schedule.request_pool()); |
| } |
| |
| void AdmissionController::UpdateStatsOnAdmission(const QuerySchedule& schedule) { |
| for (const auto& entry : schedule.per_backend_exec_params()) { |
| const TNetworkAddress& host_addr = entry.first; |
| int64_t mem_to_admit = GetMemToAdmit(schedule, entry.second); |
| UpdateHostStats(host_addr, mem_to_admit, 1, entry.second.slots_to_use); |
| } |
| PoolStats* pool_stats = GetPoolStats(schedule); |
| pool_stats->AdmitQueryAndMemory(schedule); |
| pools_for_updates_.insert(schedule.request_pool()); |
| } |
| |
| void AdmissionController::UpdateHostStats( |
| const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit, |
| int num_slots_to_admit) { |
| const string host = TNetworkAddressToString(host_addr); |
| VLOG_ROW << "Update admitted mem reserved for host=" << host |
| << " prev=" << PrintBytes(host_stats_[host].mem_admitted) |
| << " new=" << PrintBytes(host_stats_[host].mem_admitted + mem_to_admit); |
| host_stats_[host].mem_admitted += mem_to_admit; |
| DCHECK_GE(host_stats_[host].mem_admitted, 0); |
| VLOG_ROW << "Update admitted queries for host=" << host |
| << " prev=" << host_stats_[host].num_admitted |
| << " new=" << host_stats_[host].num_admitted + num_queries_to_admit; |
| host_stats_[host].num_admitted += num_queries_to_admit; |
| DCHECK_GE(host_stats_[host].num_admitted, 0); |
| VLOG_ROW << "Update slots in use for host=" << host |
| << " prev=" << host_stats_[host].slots_in_use |
| << " new=" << host_stats_[host].slots_in_use + num_slots_to_admit; |
| host_stats_[host].slots_in_use += num_slots_to_admit; |
| DCHECK_GE(host_stats_[host].slots_in_use, 0); |
| } |
| |
| // Helper method used by CanAccommodateMaxInitialReservation(). Returns true if the given |
| // 'mem_limit' can accommodate 'buffer_reservation'. If not, returns false and the |
| // details about the memory shortage in 'mem_unavailable_reason'. |
| static bool CanMemLimitAccommodateReservation( |
| int64_t mem_limit, int64_t buffer_reservation, string* mem_unavailable_reason) { |
| if (mem_limit <= 0) return true; // No mem limit. |
| const int64_t max_reservation = |
| ReservationUtil::GetReservationLimitFromMemLimit(mem_limit); |
| if (buffer_reservation <= max_reservation) return true; |
| const int64_t required_mem_limit = |
| ReservationUtil::GetMinMemLimitFromReservation(buffer_reservation); |
| *mem_unavailable_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION, |
| PrintBytes(buffer_reservation), PrintBytes(required_mem_limit)); |
| return false; |
| } |
| |
| bool AdmissionController::CanAccommodateMaxInitialReservation( |
| const QuerySchedule& schedule, const TPoolConfig& pool_cfg, |
| string* mem_unavailable_reason) { |
| const int64_t executor_mem_limit = schedule.per_backend_mem_limit(); |
| const int64_t executor_min_reservation = schedule.largest_min_reservation(); |
| const int64_t coord_mem_limit = schedule.coord_backend_mem_limit(); |
| const int64_t coord_min_reservation = schedule.coord_min_reservation(); |
| return CanMemLimitAccommodateReservation( |
| executor_mem_limit, executor_min_reservation, mem_unavailable_reason) |
| && CanMemLimitAccommodateReservation( |
| coord_mem_limit, coord_min_reservation, mem_unavailable_reason); |
| } |
| |
| bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule, |
| const TPoolConfig& pool_cfg, int64_t cluster_size, string* mem_unavailable_reason) { |
| const string& pool_name = schedule.request_pool(); |
| const int64_t pool_max_mem = GetMaxMemForPool(pool_cfg, cluster_size); |
| // If the pool doesn't have memory resources configured, always true. |
| if (pool_max_mem < 0) return true; |
| |
| // Otherwise, two conditions must be met: |
| // 1) The memory estimated to be reserved by all queries in this pool *plus* the total |
| // memory needed for this query must be within the max pool memory resources |
| // specified. |
| // 2) Each individual backend must have enough mem available within its process limit |
| // to execute the query. |
| |
| // Case 1: |
| PoolStats* stats = GetPoolStats(schedule); |
| int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit(); |
| VLOG_RPC << "Checking agg mem in pool=" << pool_name << " : " << stats->DebugString() |
| << " executor_group=" << schedule.executor_group() |
| << " cluster_mem_needed=" << PrintBytes(cluster_mem_to_admit) |
| << " pool_max_mem=" << PrintBytes(pool_max_mem) << " (" |
| << GetMaxMemForPoolDescription(pool_cfg, cluster_size) << ")"; |
| if (stats->EffectiveMemReserved() + cluster_mem_to_admit > pool_max_mem) { |
| *mem_unavailable_reason = Substitute(POOL_MEM_NOT_AVAILABLE, pool_name, |
| PrintBytes(pool_max_mem), GetMaxMemForPoolDescription(pool_cfg, cluster_size), |
| PrintBytes(cluster_mem_to_admit), |
| PrintBytes(max(pool_max_mem - stats->EffectiveMemReserved(), 0L)), |
| GetStalenessDetailLocked(" ")); |
| return false; |
| } |
| |
| // Case 2: |
| for (const auto& entry : schedule.per_backend_exec_params()) { |
| const TNetworkAddress& host = entry.first; |
| const string host_id = TNetworkAddressToString(host); |
| int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit; |
| const HostStats& host_stats = host_stats_[host_id]; |
| int64_t mem_reserved = host_stats.mem_reserved; |
| int64_t mem_admitted = host_stats.mem_admitted; |
| int64_t mem_to_admit = GetMemToAdmit(schedule, entry.second); |
| VLOG_ROW << "Checking memory on host=" << host_id |
| << " mem_reserved=" << PrintBytes(mem_reserved) |
| << " mem_admitted=" << PrintBytes(mem_admitted) |
| << " needs=" << PrintBytes(mem_to_admit) |
| << " admit_mem_limit=" << PrintBytes(admit_mem_limit); |
| int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted); |
| if (effective_host_mem_reserved + mem_to_admit > admit_mem_limit) { |
| *mem_unavailable_reason = |
| Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(mem_to_admit), |
| PrintBytes(max(admit_mem_limit - effective_host_mem_reserved, 0L)), |
| PrintBytes(admit_mem_limit), GetStalenessDetailLocked(" ")); |
| return false; |
| } |
| } |
| const TQueryOptions& query_opts = schedule.query_options(); |
| if (!query_opts.__isset.buffer_pool_limit || query_opts.buffer_pool_limit <= 0) { |
| // Check if a change in pool_cfg.max_query_mem_limit (while the query was queued) |
| // resulted in a decrease in the computed per_host_mem_limit such that it can no |
| // longer accommodate the largest min_reservation. |
| return CanAccommodateMaxInitialReservation( |
| schedule, pool_cfg, mem_unavailable_reason); |
| } |
| return true; |
| } |
| |
| bool AdmissionController::HasAvailableSlots(const QuerySchedule& schedule, |
| const TPoolConfig& pool_cfg, string* unavailable_reason) { |
| for (const auto& entry : schedule.per_backend_exec_params()) { |
| const TNetworkAddress& host = entry.first; |
| const string host_id = TNetworkAddressToString(host); |
| int64_t admission_slots = entry.second.be_desc.admission_slots; |
| int64_t slots_in_use = host_stats_[host_id].slots_in_use; |
| VLOG_ROW << "Checking available slot on host=" << host_id |
| << " slots_in_use=" << slots_in_use |
| << " needs=" << slots_in_use + entry.second.slots_to_use |
| << " executor admission_slots=" << admission_slots; |
| if (slots_in_use + entry.second.slots_to_use > admission_slots) { |
| *unavailable_reason = Substitute(HOST_SLOT_NOT_AVAILABLE, host_id, |
| entry.second.slots_to_use, slots_in_use, admission_slots); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| bool AdmissionController::CanAdmitRequest(const QuerySchedule& schedule, |
| const TPoolConfig& pool_cfg, int64_t cluster_size, bool admit_from_queue, |
| string* not_admitted_reason) { |
| // Can't admit if: |
| // (a) There are already queued requests (and this is not admitting from the queue). |
| // (b) The resource pool is already at the maximum number of requests. |
| // (c) One of the executors in 'schedule' is already at its maximum number of requests |
| // (when not using the default executor group). |
| // (d) There are not enough memory resources available for the query. |
| |
| const int64_t max_requests = GetMaxRequestsForPool(pool_cfg, cluster_size); |
| PoolStats* pool_stats = GetPoolStats(schedule); |
| bool default_group = |
| schedule.executor_group() == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME; |
| if (!admit_from_queue && pool_stats->local_stats().num_queued > 0) { |
| *not_admitted_reason = Substitute(QUEUED_QUEUE_NOT_EMPTY, |
| pool_stats->local_stats().num_queued, GetStalenessDetailLocked(" ")); |
| return false; |
| } |
| if (max_requests >= 0 && pool_stats->agg_num_running() >= max_requests) { |
| // All executor groups are limited by the aggregate number of queries running in the |
| // pool. |
| *not_admitted_reason = Substitute(QUEUED_NUM_RUNNING, pool_stats->agg_num_running(), |
| max_requests, GetMaxRequestsForPoolDescription(pool_cfg, cluster_size), |
| GetStalenessDetailLocked(" ")); |
| return false; |
| } |
| if (!default_group && !HasAvailableSlots(schedule, pool_cfg, not_admitted_reason)) { |
| // All non-default executor groups are also limited by the number of running queries |
| // per executor. |
| // TODO(IMPALA-8757): Extend slot based admission to default executor group |
| return false; |
| } |
| if (!HasAvailableMemResources(schedule, pool_cfg, cluster_size, not_admitted_reason)) { |
| return false; |
| } |
| return true; |
| } |
| |
| bool AdmissionController::RejectForCluster(const string& pool_name, |
| const TPoolConfig& pool_cfg, bool admit_from_queue, int64_t cluster_size, |
| string* rejection_reason) { |
| DCHECK(rejection_reason != nullptr && rejection_reason->empty()); |
| |
| // Checks related to pool max_requests: |
| if (GetMaxRequestsForPool(pool_cfg, cluster_size) == 0) { |
| *rejection_reason = REASON_DISABLED_REQUESTS_LIMIT; |
| return true; |
| } |
| |
| // Checks related to pool max_mem_resources: |
| int64_t max_mem = GetMaxMemForPool(pool_cfg, cluster_size); |
| if (max_mem == 0) { |
| *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES; |
| return true; |
| } |
| |
| if (max_mem > 0 && pool_cfg.min_query_mem_limit > max_mem) { |
| if (PoolHasFixedMemoryLimit(pool_cfg)) { |
| *rejection_reason = Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_FIXED, |
| pool_cfg.min_query_mem_limit, max_mem); |
| } else { |
| *rejection_reason = |
| Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_MULTIPLE, |
| pool_cfg.min_query_mem_limit, max_mem, |
| GetMaxMemForPoolDescription(pool_cfg, cluster_size)); |
| } |
| return true; |
| } |
| |
| if (pool_cfg.max_query_mem_limit > 0 |
| && pool_cfg.min_query_mem_limit > pool_cfg.max_query_mem_limit) { |
| *rejection_reason = Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_LIMIT, |
| pool_cfg.min_query_mem_limit, pool_cfg.max_query_mem_limit); |
| return true; |
| } |
| |
| PoolStats* stats = GetPoolStats(pool_name); |
| int64_t max_queued = GetMaxQueuedForPool(pool_cfg, cluster_size); |
| if (!admit_from_queue && stats->agg_num_queued() >= max_queued) { |
| *rejection_reason = Substitute(REASON_QUEUE_FULL, max_queued, |
| GetMaxQueuedForPoolDescription(pool_cfg, cluster_size), stats->agg_num_queued(), |
| GetStalenessDetailLocked(" ")); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule, |
| const TPoolConfig& pool_cfg, int64_t cluster_size, int64_t group_size, |
| string* rejection_reason) { |
| DCHECK(rejection_reason != nullptr && rejection_reason->empty()); |
| bool default_group = |
| schedule.executor_group() == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME; |
| |
| // Compute the max (over all backends), the cluster totals (across all backends) for |
| // min_mem_reservation_bytes, thread_reservation, the min admit_mem_limit |
| // (over all executors) and the admit_mem_limit of the coordinator. |
| pair<const TNetworkAddress*, int64_t> largest_min_mem_reservation(nullptr, -1); |
| int64_t cluster_min_mem_reservation_bytes = 0; |
| pair<const TNetworkAddress*, int64_t> max_thread_reservation(nullptr, 0); |
| pair<const TNetworkAddress*, int64_t> min_executor_admit_mem_limit( |
| nullptr, std::numeric_limits<int64_t>::max()); |
| pair<const TNetworkAddress*, int64_t> coord_admit_mem_limit( |
| nullptr, std::numeric_limits<int64_t>::max()); |
| int64_t cluster_thread_reservation = 0; |
| for (const auto& e : schedule.per_backend_exec_params()) { |
| const BackendExecParams& bp = e.second; |
| // TODO(IMPALA-8757): Extend slot based admission to default executor group |
| if (!default_group && bp.slots_to_use > bp.be_desc.admission_slots) { |
| *rejection_reason = Substitute(REASON_NOT_ENOUGH_SLOTS_ON_BACKEND, bp.slots_to_use, |
| TNetworkAddressToString(bp.be_desc.address), bp.be_desc.admission_slots); |
| return true; |
| } |
| |
| cluster_min_mem_reservation_bytes += bp.min_mem_reservation_bytes; |
| if (bp.min_mem_reservation_bytes > largest_min_mem_reservation.second) { |
| largest_min_mem_reservation = make_pair(&e.first, bp.min_mem_reservation_bytes); |
| } |
| cluster_thread_reservation += bp.thread_reservation; |
| if (bp.thread_reservation > max_thread_reservation.second) { |
| max_thread_reservation = make_pair(&e.first, bp.thread_reservation); |
| } |
| if (bp.is_coord_backend) { |
| coord_admit_mem_limit.first = &e.first; |
| coord_admit_mem_limit.second = bp.be_desc.admit_mem_limit; |
| } else if (bp.be_desc.admit_mem_limit < min_executor_admit_mem_limit.second) { |
| min_executor_admit_mem_limit.first = &e.first; |
| min_executor_admit_mem_limit.second = bp.be_desc.admit_mem_limit; |
| } |
| } |
| |
| // Checks related to the min buffer reservation against configured query memory limits: |
| const TQueryOptions& query_opts = schedule.query_options(); |
| if (query_opts.__isset.buffer_pool_limit && query_opts.buffer_pool_limit > 0) { |
| if (largest_min_mem_reservation.second > query_opts.buffer_pool_limit) { |
| *rejection_reason = Substitute(REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION, |
| TNetworkAddressToString(*largest_min_mem_reservation.first), |
| PrintBytes(largest_min_mem_reservation.second)); |
| return true; |
| } |
| } else if (!CanAccommodateMaxInitialReservation(schedule, pool_cfg, rejection_reason)) { |
| // If buffer_pool_limit is not explicitly set, it's calculated from mem_limit. |
| return true; |
| } |
| |
| // Check thread reservation limits. |
| if (query_opts.__isset.thread_reservation_limit |
| && query_opts.thread_reservation_limit > 0 |
| && max_thread_reservation.second > query_opts.thread_reservation_limit) { |
| *rejection_reason = Substitute(REASON_THREAD_RESERVATION_LIMIT_EXCEEDED, |
| TNetworkAddressToString(*max_thread_reservation.first), |
| max_thread_reservation.second, query_opts.thread_reservation_limit); |
| return true; |
| } |
| if (query_opts.__isset.thread_reservation_aggregate_limit |
| && query_opts.thread_reservation_aggregate_limit > 0 |
| && cluster_thread_reservation > query_opts.thread_reservation_aggregate_limit) { |
| *rejection_reason = Substitute(REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED, |
| schedule.per_backend_exec_params().size(), cluster_thread_reservation, |
| query_opts.thread_reservation_aggregate_limit); |
| return true; |
| } |
| |
| // Checks related to pool max_mem_resources: |
| // We perform these checks here against the group_size to prevent queuing up queries |
| // that would never be able to reserve the required memory on an executor group. |
| int64_t max_mem = GetMaxMemForPool(pool_cfg, group_size); |
| if (max_mem == 0) { |
| *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES; |
| return true; |
| } |
| if (max_mem > 0) { |
| if (cluster_min_mem_reservation_bytes > max_mem) { |
| *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM, |
| PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, group_size), |
| PrintBytes(cluster_min_mem_reservation_bytes)); |
| return true; |
| } |
| int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit(); |
| if (cluster_mem_to_admit > max_mem) { |
| *rejection_reason = |
| Substitute(REASON_REQ_OVER_POOL_MEM, PrintBytes(cluster_mem_to_admit), |
| PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, group_size)); |
| return true; |
| } |
| int64_t executor_mem_to_admit = schedule.per_backend_mem_to_admit(); |
| VLOG_ROW << "Checking executor mem with executor_mem_to_admit = " |
| << executor_mem_to_admit |
| << " and min_admit_mem_limit.second = " |
| << min_executor_admit_mem_limit.second; |
| if (executor_mem_to_admit > min_executor_admit_mem_limit.second) { |
| *rejection_reason = |
| Substitute(REASON_REQ_OVER_NODE_MEM, PrintBytes(executor_mem_to_admit), |
| PrintBytes(min_executor_admit_mem_limit.second), |
| TNetworkAddressToString(*min_executor_admit_mem_limit.first)); |
| return true; |
| } |
| int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit(); |
| VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = " |
| << coord_mem_to_admit |
| << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second; |
| if (coord_mem_to_admit > coord_admit_mem_limit.second) { |
| *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM, |
| PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second), |
| TNetworkAddressToString(*coord_admit_mem_limit.first)); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void AdmissionController::PoolStats::UpdateConfigMetrics( |
| const TPoolConfig& pool_cfg, int64_t cluster_size) { |
| metrics_.pool_max_mem_resources->SetValue(pool_cfg.max_mem_resources); |
| metrics_.pool_max_requests->SetValue(pool_cfg.max_requests); |
| metrics_.pool_max_queued->SetValue(pool_cfg.max_queued); |
| metrics_.pool_queue_timeout->SetValue(GetQueueTimeoutForPoolMs(pool_cfg)); |
| metrics_.max_query_mem_limit->SetValue(pool_cfg.max_query_mem_limit); |
| metrics_.min_query_mem_limit->SetValue(pool_cfg.min_query_mem_limit); |
| metrics_.clamp_mem_limit_query_option->SetValue(pool_cfg.clamp_mem_limit_query_option); |
| metrics_.max_running_queries_multiple->SetValue(pool_cfg.max_running_queries_multiple); |
| metrics_.max_queued_queries_multiple->SetValue(pool_cfg.max_queued_queries_multiple); |
| metrics_.max_memory_multiple->SetValue(pool_cfg.max_memory_multiple); |
| } |
| |
| void AdmissionController::PoolStats::UpdateDerivedMetrics( |
| const TPoolConfig& pool_cfg, int64_t cluster_size) { |
| metrics_.max_running_queries_derived->SetValue( |
| GetMaxRequestsForPool(pool_cfg, cluster_size)); |
| metrics_.max_queued_queries_derived->SetValue( |
| GetMaxQueuedForPool(pool_cfg, cluster_size)); |
| metrics_.max_memory_derived->SetValue(GetMaxMemForPool(pool_cfg, cluster_size)); |
| } |
| |
| Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, |
| Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome, |
| std::unique_ptr<QuerySchedule>* schedule_result) { |
| DCHECK(schedule_result->get() == nullptr); |
| |
| ClusterMembershipMgr::SnapshotPtr membership_snapshot = |
| cluster_membership_mgr_->GetSnapshot(); |
| DCHECK(membership_snapshot.get() != nullptr); |
| string blacklist_str = membership_snapshot->executor_blacklist.BlacklistToString(); |
| if (!blacklist_str.empty()) { |
| request.summary_profile->AddInfoString("Blacklisted Executors", blacklist_str); |
| } |
| |
| // Note the queue_node will not exist in the queue when this method returns. |
| QueueNode queue_node(request, admit_outcome, request.summary_profile); |
| |
| // Re-resolve the pool name to propagate any resolution errors now that this request is |
| // known to require a valid pool. All executor groups / schedules will use the same pool |
| // name. |
| string pool_name; |
| TPoolConfig pool_cfg; |
| RETURN_IF_ERROR( |
| ResolvePoolAndGetConfig(request.request.query_ctx, &pool_name, &pool_cfg)); |
| request.summary_profile->AddInfoString("Request Pool", pool_name); |
| |
| const int64_t cluster_size = GetClusterSize(*membership_snapshot); |
| // We track this outside of the queue node so that it is still available after the query |
| // has been dequeued. |
| string initial_queue_reason; |
| ScopedEvent completedEvent(request.query_events, QUERY_EVENT_COMPLETED_ADMISSION); |
| { |
| // Take lock to ensure the Dequeue thread does not modify the request queue. |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| request.query_events->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION); |
| |
| pool_config_map_[pool_name] = pool_cfg; |
| PoolStats* stats = GetPoolStats(pool_name); |
| stats->UpdateConfigMetrics(pool_cfg, cluster_size); |
| stats->UpdateDerivedMetrics(pool_cfg, cluster_size); |
| |
| bool must_reject = !FindGroupToAdmitOrReject(cluster_size, membership_snapshot, |
| pool_cfg, /* admit_from_queue=*/false, stats, &queue_node); |
| if (must_reject) { |
| AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::REJECTED); |
| if (outcome != AdmissionOutcome::REJECTED) { |
| DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED); |
| VLOG_QUERY << "Ready to be " << PROFILE_INFO_VAL_REJECTED |
| << " but already cancelled, query id=" << PrintId(request.query_id); |
| return Status::CANCELLED; |
| } |
| request.summary_profile->AddInfoString( |
| PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED); |
| stats->metrics()->total_rejected->Increment(1); |
| const ErrorMsg& rejected_msg = ErrorMsg( |
| TErrorCode::ADMISSION_REJECTED, pool_name, queue_node.not_admitted_reason); |
| VLOG_QUERY << rejected_msg.msg(); |
| return Status::Expected(rejected_msg); |
| } |
| |
| if (queue_node.admitted_schedule.get() != nullptr) { |
| const string& group_name = queue_node.admitted_schedule->executor_group(); |
| VLOG(3) << "Can admit to group " << group_name << " (or cancelled)"; |
| DCHECK_EQ(stats->local_stats().num_queued, 0); |
| *schedule_result = std::move(queue_node.admitted_schedule); |
| AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::ADMITTED); |
| if (outcome != AdmissionOutcome::ADMITTED) { |
| DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED); |
| VLOG_QUERY << "Ready to be " << PROFILE_INFO_VAL_ADMIT_IMMEDIATELY |
| << " but already cancelled, query id=" << PrintId(request.query_id); |
| return Status::CANCELLED; |
| } |
| VLOG_QUERY << "Admitting query id=" << PrintId(request.query_id); |
| AdmitQuery(schedule_result->get(), false); |
| stats->UpdateWaitTime(0); |
| VLOG_RPC << "Final: " << stats->DebugString(); |
| return Status::OK(); |
| } |
| |
| // We cannot immediately admit but do not need to reject, so queue the request |
| RequestQueue* queue = &request_queue_map_[pool_name]; |
| VLOG_QUERY << "Queuing, query id=" << PrintId(request.query_id) |
| << " reason: " << queue_node.not_admitted_reason; |
| initial_queue_reason = queue_node.not_admitted_reason; |
| stats->Queue(); |
| queue->Enqueue(&queue_node); |
| } |
| |
| // Update the profile info before waiting. These properties will be updated with |
| // their final state after being dequeued. |
| request.summary_profile->AddInfoString( |
| PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_QUEUED); |
| request.summary_profile->AddInfoString( |
| PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, initial_queue_reason); |
| request.summary_profile->AddInfoString( |
| PROFILE_INFO_KEY_LAST_QUEUED_REASON, queue_node.not_admitted_reason); |
| request.query_events->MarkEvent(QUERY_EVENT_QUEUED); |
| |
| int64_t queue_wait_timeout_ms = GetQueueTimeoutForPoolMs(pool_cfg); |
| int64_t wait_start_ms = MonotonicMillis(); |
| |
| // Block in Get() up to the time out, waiting for the promise to be set when the query |
| // is admitted or cancelled. |
| bool timed_out; |
| admit_outcome->Get(queue_wait_timeout_ms, &timed_out); |
| int64_t wait_time_ms = MonotonicMillis() - wait_start_ms; |
| request.summary_profile->AddInfoString(PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, |
| Substitute( |
| PROFILE_INFO_VAL_INITIAL_QUEUE_REASON, wait_time_ms, initial_queue_reason)); |
| |
| // Disallow the FAIL action here. It would leave the queue in an inconsistent state. |
| DebugActionNoFail(request.query_options, "AC_AFTER_ADMISSION_OUTCOME"); |
| |
| { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| // If the query has not been admitted or cancelled up till now, it will be considered |
| // to be timed out. |
| AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::TIMED_OUT); |
| RequestQueue* queue = &request_queue_map_[pool_name]; |
| pools_for_updates_.insert(pool_name); |
| PoolStats* pool_stats = GetPoolStats(pool_name); |
| pool_stats->UpdateWaitTime(wait_time_ms); |
| if (outcome == AdmissionOutcome::REJECTED) { |
| if (queue->Remove(&queue_node)) pool_stats->Dequeue(true); |
| request.summary_profile->AddInfoString( |
| PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED); |
| const ErrorMsg& rejected_msg = ErrorMsg( |
| TErrorCode::ADMISSION_REJECTED, pool_name, queue_node.not_admitted_reason); |
| VLOG_QUERY << rejected_msg.msg(); |
| return Status::Expected(rejected_msg); |
| } else if (outcome == AdmissionOutcome::TIMED_OUT) { |
| bool removed = queue->Remove(&queue_node); |
| DCHECK(removed); |
| pool_stats->Dequeue(true); |
| request.summary_profile->AddInfoString( |
| PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_TIME_OUT); |
| const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT, |
| queue_wait_timeout_ms, pool_name, queue_node.not_admitted_reason); |
| VLOG_QUERY << rejected_msg.msg(); |
| return Status::Expected(rejected_msg); |
| } else if (outcome == AdmissionOutcome::CANCELLED) { |
| if (queue->Remove(&queue_node)) pool_stats->Dequeue(false); |
| request.summary_profile->AddInfoString( |
| PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_CANCELLED_IN_QUEUE); |
| VLOG_QUERY << PROFILE_INFO_VAL_CANCELLED_IN_QUEUE |
| << ", query id=" << PrintId(request.query_id); |
| return Status::CANCELLED; |
| } |
| // The dequeue thread updates the stats (to avoid a race condition) so we do |
| // not change them here. |
| DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED); |
| DCHECK(queue_node.admitted_schedule.get() != nullptr); |
| *schedule_result = std::move(queue_node.admitted_schedule); |
| DCHECK(!queue->Contains(&queue_node)); |
| VLOG_QUERY << "Admitted queued query id=" << PrintId(request.query_id); |
| VLOG_RPC << "Final: " << pool_stats->DebugString(); |
| return Status::OK(); |
| } |
| } |
| |
| void AdmissionController::ReleaseQuery( |
| const QuerySchedule& schedule, int64_t peak_mem_consumption) { |
| const string& pool_name = schedule.request_pool(); |
| { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| DCHECK_EQ(num_released_backends_.at(schedule.query_id()), 0); |
| num_released_backends_.erase(num_released_backends_.find(schedule.query_id())); |
| PoolStats* stats = GetPoolStats(schedule); |
| stats->ReleaseQuery(peak_mem_consumption); |
| // No need to update the Host Stats as they should have been updated in |
| // ReleaseQueryBackends. |
| pools_for_updates_.insert(pool_name); |
| UpdateExecGroupMetric(schedule.executor_group(), -1); |
| VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " " |
| << stats->DebugString(); |
| pending_dequeue_ = true; |
| } |
| dequeue_cv_.NotifyOne(); |
| } |
| |
| void AdmissionController::ReleaseQueryBackends( |
| const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addrs) { |
| { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| UpdateStatsOnReleaseForBackends(schedule, host_addrs); |
| |
| // Update num_released_backends_. |
| auto released_backends = num_released_backends_.find(schedule.query_id()); |
| if (released_backends != num_released_backends_.end()) { |
| released_backends->second -= host_addrs.size(); |
| } else { |
| string err_msg = Substitute("Unable to find num released backends for query $0", |
| PrintId(schedule.query_id())); |
| DCHECK(false) << err_msg; |
| LOG(ERROR) << err_msg; |
| } |
| |
| if (VLOG_IS_ON(2)) { |
| stringstream ss; |
| ss << "Released query backend(s) "; |
| for (auto host_addr : host_addrs) ss << PrintThrift(host_addr) << " "; |
| ss << "for query id=" << PrintId(schedule.query_id()) << " " |
| << GetPoolStats(schedule)->DebugString(); |
| VLOG(2) << ss.str(); |
| } |
| pending_dequeue_ = true; |
| } |
| dequeue_cv_.NotifyOne(); |
| } |
| |
| Status AdmissionController::ResolvePoolAndGetConfig( |
| const TQueryCtx& query_ctx, string* pool_name, TPoolConfig* pool_config) { |
| RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(query_ctx, pool_name)); |
| DCHECK_EQ(query_ctx.request_pool, *pool_name); |
| return request_pool_service_->GetPoolConfig(*pool_name, pool_config); |
| } |
| |
| // Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC. |
| void AdmissionController::UpdatePoolStats( |
| const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas, |
| vector<TTopicDelta>* subscriber_topic_updates) { |
| { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| AddPoolUpdates(subscriber_topic_updates); |
| |
| StatestoreSubscriber::TopicDeltaMap::const_iterator topic = |
| incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC); |
| if (topic != incoming_topic_deltas.end()) { |
| const TTopicDelta& delta = topic->second; |
| // Delta and non-delta updates are handled the same way, except for a full update |
| // we first clear the backend TPoolStats. We then update the global map |
| // and then re-compute the pool stats for any pools that changed. |
| if (!delta.is_delta) { |
| VLOG_ROW << "Full impala-request-queue stats update"; |
| for (auto& entry : pool_stats_) entry.second.ClearRemoteStats(); |
| } |
| HandleTopicUpdates(delta.topic_entries); |
| } |
| UpdateClusterAggregates(); |
| last_topic_update_time_ms_ = MonotonicMillis(); |
| pending_dequeue_ = true; |
| } |
| dequeue_cv_.NotifyOne(); // Dequeue and admit queries on the dequeue thread |
| } |
| |
| void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id, |
| TPoolStats* host_stats) { |
| DCHECK_NE(host_id, parent_->host_id_); // Shouldn't be updating for local host. |
| RemoteStatsMap::iterator it = remote_stats_.find(host_id); |
| if (VLOG_ROW_IS_ON) { |
| stringstream ss; |
| ss << "Stats update for pool=" << name_ << " backend=" << host_id; |
| if (host_stats == nullptr) ss << " topic deletion"; |
| if (it != remote_stats_.end()) ss << " previous: " << DebugPoolStats(it->second); |
| if (host_stats != nullptr) ss << " new: " << DebugPoolStats(*host_stats); |
| VLOG_ROW << ss.str(); |
| } |
| if (host_stats == nullptr) { |
| if (it != remote_stats_.end()) { |
| remote_stats_.erase(it); |
| } else { |
| VLOG_QUERY << "Attempted to remove non-existent remote stats for host=" << host_id; |
| } |
| } else { |
| remote_stats_[host_id] = *host_stats; |
| } |
| } |
| |
| void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_updates) { |
| for (const TTopicItem& item: topic_updates) { |
| string pool_name; |
| string topic_backend_id; |
| if (!ParsePoolTopicKey(item.key, &pool_name, &topic_backend_id)) continue; |
| // The topic entry from this subscriber is handled specially; the stats coming |
| // from the statestore are likely already outdated. |
| if (topic_backend_id == host_id_) continue; |
| if (item.deleted) { |
| GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr); |
| continue; |
| } |
| TPoolStats remote_update; |
| uint32_t len = item.value.size(); |
| Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>( |
| item.value.data()), &len, false, &remote_update); |
| if (!status.ok()) { |
| VLOG_QUERY << "Error deserializing pool update with key: " << item.key; |
| continue; |
| } |
| GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, &remote_update); |
| } |
| } |
| |
| void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reserved) { |
| const string& coord_id = parent_->host_id_; |
| int64_t num_running = 0; |
| int64_t num_queued = 0; |
| int64_t mem_reserved = 0; |
| for (const PoolStats::RemoteStatsMap::value_type& remote_entry : remote_stats_) { |
| const string& host = remote_entry.first; |
| // Skip an update from this subscriber as the information may be outdated. |
| // The stats from this coordinator will be added below. |
| if (host == coord_id) continue; |
| const TPoolStats& remote_pool_stats = remote_entry.second; |
| DCHECK_GE(remote_pool_stats.num_admitted_running, 0); |
| DCHECK_GE(remote_pool_stats.num_queued, 0); |
| DCHECK_GE(remote_pool_stats.backend_mem_reserved, 0); |
| num_running += remote_pool_stats.num_admitted_running; |
| num_queued += remote_pool_stats.num_queued; |
| |
| // Update the per-pool and per-host aggregates with the mem reserved by this host in |
| // this pool. |
| mem_reserved += remote_pool_stats.backend_mem_reserved; |
| (*host_mem_reserved)[host] += remote_pool_stats.backend_mem_reserved; |
| // TODO(IMPALA-8762): For multiple coordinators, need to track the number of running |
| // queries per executor, i.e. every admission controller needs to send the full map to |
| // everyone else. |
| } |
| num_running += local_stats_.num_admitted_running; |
| num_queued += local_stats_.num_queued; |
| mem_reserved += local_stats_.backend_mem_reserved; |
| (*host_mem_reserved)[coord_id] += local_stats_.backend_mem_reserved; |
| |
| DCHECK_GE(num_running, 0); |
| DCHECK_GE(num_queued, 0); |
| DCHECK_GE(mem_reserved, 0); |
| DCHECK_GE(num_running, local_stats_.num_admitted_running); |
| DCHECK_GE(num_queued, local_stats_.num_queued); |
| |
| if (agg_num_running_ == num_running && agg_num_queued_ == num_queued |
| && agg_mem_reserved_ == mem_reserved) { |
| DCHECK_EQ(num_running, metrics_.agg_num_running->GetValue()); |
| DCHECK_EQ(num_queued, metrics_.agg_num_queued->GetValue()); |
| DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->GetValue()); |
| return; |
| } |
| VLOG_ROW << "Recomputed agg stats, previous: " << DebugString(); |
| agg_num_running_ = num_running; |
| agg_num_queued_ = num_queued; |
| agg_mem_reserved_ = mem_reserved; |
| metrics_.agg_num_running->SetValue(num_running); |
| metrics_.agg_num_queued->SetValue(num_queued); |
| metrics_.agg_mem_reserved->SetValue(mem_reserved); |
| VLOG_ROW << "Updated: " << DebugString(); |
| } |
| |
| void AdmissionController::UpdateClusterAggregates() { |
| // Recompute mem_reserved for all hosts. |
| PoolStats::HostMemMap updated_mem_reserved; |
| for (auto& entry : pool_stats_) entry.second.UpdateAggregates(&updated_mem_reserved); |
| |
| stringstream ss; |
| ss << "Updated mem reserved for hosts:"; |
| int i = 0; |
| for (const auto& e : updated_mem_reserved) { |
| int64_t old_mem_reserved = host_stats_[e.first].mem_reserved; |
| if (old_mem_reserved == e.second) continue; |
| host_stats_[e.first].mem_reserved = e.second; |
| if (VLOG_ROW_IS_ON) { |
| ss << endl << e.first << ": " << PrintBytes(old_mem_reserved); |
| ss << " -> " << PrintBytes(e.second); |
| ++i; |
| } |
| } |
| if (i > 0) VLOG_ROW << ss.str(); |
| } |
| |
| Status AdmissionController::ComputeGroupSchedules( |
| ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node) { |
| int64_t previous_membership_version = 0; |
| if (queue_node->membership_snapshot.get() != nullptr) { |
| previous_membership_version = queue_node->membership_snapshot->version; |
| } |
| int64_t current_membership_version = membership_snapshot->version; |
| |
| DCHECK_GT(current_membership_version, 0); |
| DCHECK_GE(current_membership_version, previous_membership_version); |
| if (current_membership_version <= previous_membership_version) { |
| VLOG(3) << "No rescheduling necessary, previous membership version: " |
| << previous_membership_version |
| << ", current membership version: " << current_membership_version; |
| return Status::OK(); |
| } |
| const AdmissionRequest& request = queue_node->admission_request; |
| VLOG(3) << "Scheduling query " << PrintId(request.query_id) |
| << " with membership version " << current_membership_version; |
| |
| queue_node->membership_snapshot = membership_snapshot; |
| std::vector<GroupSchedule>* output_schedules = &queue_node->group_schedules; |
| output_schedules->clear(); |
| |
| const string& pool_name = request.request.query_ctx.request_pool; |
| |
| // If the first statestore update arrives before the local backend has finished starting |
| // up, we might not have a local backend descriptor yet. We return no schedules, which |
| // will result in the query being queued. |
| if (membership_snapshot->local_be_desc == nullptr) { |
| queue_node->not_admitted_reason = REASON_LOCAL_BACKEND_NOT_STARTED; |
| LOG(WARNING) << queue_node->not_admitted_reason; |
| return Status::OK(); |
| } |
| const TBackendDescriptor& local_be_desc = *membership_snapshot->local_be_desc; |
| |
| vector<const ExecutorGroup*> executor_groups; |
| GetExecutorGroupsForPool( |
| membership_snapshot->executor_groups, pool_name, &executor_groups); |
| |
| if (executor_groups.empty()) { |
| queue_node->not_admitted_reason = REASON_NO_EXECUTOR_GROUPS; |
| LOG(WARNING) << queue_node->not_admitted_reason; |
| return Status::OK(); |
| } |
| |
| // We loop over the executor groups in a deterministic order. This means we will fill up |
| // each executor group before considering an unused one. In particular, we will not try |
| // to balance queries across executor groups equally. |
| // TODO(IMPALA-8731): balance queries across executor groups more evenly |
| for (const ExecutorGroup* executor_group : executor_groups) { |
| DCHECK(executor_group->IsHealthy()); |
| DCHECK_GT(executor_group->NumExecutors(), 0); |
| unique_ptr<QuerySchedule> group_schedule = |
| make_unique<QuerySchedule>(request.query_id, request.request, |
| request.query_options, request.summary_profile, request.query_events); |
| const string& group_name = executor_group->name(); |
| VLOG(3) << "Scheduling for executor group: " << group_name << " with " |
| << executor_group->NumExecutors() << " executors"; |
| const Scheduler::ExecutorConfig group_config = {*executor_group, local_be_desc}; |
| RETURN_IF_ERROR(ExecEnv::GetInstance()->scheduler()->Schedule( |
| group_config, group_schedule.get())); |
| DCHECK(!group_schedule->executor_group().empty()); |
| output_schedules->emplace_back(std::move(group_schedule), *executor_group); |
| } |
| DCHECK(!output_schedules->empty()); |
| return Status::OK(); |
| } |
| |
| bool AdmissionController::FindGroupToAdmitOrReject(int64_t cluster_size, |
| ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config, |
| bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node) { |
| // Check for rejection based on current cluster size |
| const string& pool_name = pool_stats->name(); |
| string rejection_reason; |
| if (RejectForCluster( |
| pool_name, pool_config, admit_from_queue, cluster_size, &rejection_reason)) { |
| DCHECK(!rejection_reason.empty()); |
| queue_node->not_admitted_reason = rejection_reason; |
| return false; |
| } |
| |
| // Compute schedules |
| Status ret = ComputeGroupSchedules(membership_snapshot, queue_node); |
| if (!ret.ok()) { |
| DCHECK(queue_node->not_admitted_reason.empty()); |
| queue_node->not_admitted_reason = Substitute(REASON_SCHEDULER_ERROR, ret.GetDetail()); |
| return false; |
| } |
| if (queue_node->group_schedules.empty()) { |
| DCHECK(!queue_node->not_admitted_reason.empty()); |
| return true; |
| } |
| |
| for (GroupSchedule& group_schedule : queue_node->group_schedules) { |
| const ExecutorGroup& executor_group = group_schedule.executor_group; |
| DCHECK_GT(executor_group.NumExecutors(), 0); |
| QuerySchedule* schedule = group_schedule.schedule.get(); |
| schedule->UpdateMemoryRequirements(pool_config); |
| |
| const string& group_name = executor_group.name(); |
| int64_t group_size = executor_group.NumExecutors(); |
| VLOG(3) << "Trying to admit query to pool " << pool_name << " in executor group " |
| << group_name << " (" << group_size << " executors)"; |
| |
| const int64_t max_queued = GetMaxQueuedForPool(pool_config, cluster_size); |
| const int64_t max_mem = GetMaxMemForPool(pool_config, cluster_size); |
| const int64_t max_requests = GetMaxRequestsForPool(pool_config, cluster_size); |
| VLOG_QUERY << "Trying to admit id=" << PrintId(schedule->query_id()) |
| << " in pool_name=" << pool_name << " executor_group_name=" << group_name |
| << " per_host_mem_estimate=" |
| << PrintBytes(schedule->GetPerExecutorMemoryEstimate()) |
| << " dedicated_coord_mem_estimate=" |
| << PrintBytes(schedule->GetDedicatedCoordMemoryEstimate()) |
| << " max_requests=" << max_requests << " (" |
| << GetMaxRequestsForPoolDescription(pool_config, cluster_size) << ")" |
| << " max_queued=" << max_queued << " (" |
| << GetMaxQueuedForPoolDescription(pool_config, cluster_size) << ")" |
| << " max_mem=" << PrintBytes(max_mem) << " (" |
| << GetMaxMemForPoolDescription(pool_config, cluster_size) << ")"; |
| VLOG_QUERY << "Stats: " << pool_stats->DebugString(); |
| |
| // Query is rejected if the rejection check fails on *any* group. |
| if (RejectForSchedule( |
| *schedule, pool_config, cluster_size, group_size, &rejection_reason)) { |
| DCHECK(!rejection_reason.empty()); |
| queue_node->not_admitted_reason = rejection_reason; |
| return false; |
| } |
| |
| if (CanAdmitRequest(*schedule, pool_config, cluster_size, admit_from_queue, |
| &queue_node->not_admitted_reason)) { |
| queue_node->admitted_schedule = std::move(group_schedule.schedule); |
| return true; |
| } else { |
| VLOG_RPC << "Cannot admit query " << queue_node->admission_request.query_id |
| << " to group " << group_name << ": " << queue_node->not_admitted_reason; |
| } |
| } |
| return true; |
| } |
| |
| void AdmissionController::PoolStats::UpdateMemTrackerStats() { |
| // May be NULL if no queries have ever executed in this pool on this node but another |
| // node sent stats for this pool. |
| MemTracker* tracker = |
| ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(name_, false); |
| |
| const int64_t current_reserved = |
| tracker == nullptr ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved(); |
| if (current_reserved != local_stats_.backend_mem_reserved) { |
| parent_->pools_for_updates_.insert(name_); |
| local_stats_.backend_mem_reserved = current_reserved; |
| metrics_.local_backend_mem_reserved->SetValue(current_reserved); |
| } |
| |
| const int64_t current_usage = |
| tracker == nullptr ? static_cast<int64_t>(0) : tracker->consumption(); |
| metrics_.local_backend_mem_usage->SetValue(current_usage); |
| } |
| |
| void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) { |
| // local_stats_ are updated eagerly except for backend_mem_reserved (which isn't used |
| // for local admission control decisions). Update that now before sending local_stats_. |
| for (auto& entry : pool_stats_) { |
| entry.second.UpdateMemTrackerStats(); |
| } |
| if (pools_for_updates_.empty()) return; |
| topic_updates->push_back(TTopicDelta()); |
| TTopicDelta& topic_delta = topic_updates->back(); |
| topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC; |
| for (const string& pool_name: pools_for_updates_) { |
| DCHECK(pool_stats_.find(pool_name) != pool_stats_.end()); |
| PoolStats* stats = GetPoolStats(pool_name); |
| VLOG_ROW << "Sending topic update " << stats->DebugString(); |
| topic_delta.topic_entries.push_back(TTopicItem()); |
| TTopicItem& topic_item = topic_delta.topic_entries.back(); |
| topic_item.key = MakePoolTopicKey(pool_name, host_id_); |
| Status status = thrift_serializer_.SerializeToString(&stats->local_stats(), |
| &topic_item.value); |
| if (!status.ok()) { |
| LOG(WARNING) << "Failed to serialize query pool stats: " << status.GetDetail(); |
| topic_updates->pop_back(); |
| } |
| } |
| pools_for_updates_.clear(); |
| } |
| |
| void AdmissionController::DequeueLoop() { |
| unique_lock<mutex> lock(admission_ctrl_lock_); |
| while (true) { |
| if (done_) break; |
| while (!pending_dequeue_) { |
| dequeue_cv_.Wait(lock); |
| } |
| pending_dequeue_ = false; |
| ClusterMembershipMgr::SnapshotPtr membership_snapshot = |
| cluster_membership_mgr_->GetSnapshot(); |
| |
| // If a query was queued while the cluster is still starting up but the client facing |
| // services have already started to accept connections, the whole membership can still |
| // be empty. |
| if (membership_snapshot->executor_groups.empty()) continue; |
| const int64_t cluster_size = GetClusterSize(*membership_snapshot); |
| |
| for (const PoolConfigMap::value_type& entry: pool_config_map_) { |
| const string& pool_name = entry.first; |
| const TPoolConfig& pool_config = entry.second; |
| PoolStats* stats = GetPoolStats(pool_name, /* dcheck_exists=*/true); |
| stats->UpdateDerivedMetrics(pool_config, cluster_size); |
| |
| if (stats->local_stats().num_queued == 0) continue; // Nothing to dequeue |
| DCHECK_GE(stats->agg_num_queued(), stats->local_stats().num_queued); |
| |
| RequestQueue& queue = request_queue_map_[pool_name]; |
| int64_t max_to_dequeue = GetMaxToDequeue(queue, stats, pool_config, cluster_size); |
| VLOG_RPC << "Dequeue thread will try to admit " << max_to_dequeue << " requests" |
| << ", pool=" << pool_name |
| << ", num_queued=" << stats->local_stats().num_queued |
| << " cluster_size=" << cluster_size; |
| if (max_to_dequeue == 0) continue; // to next pool. |
| |
| while (max_to_dequeue > 0 && !queue.empty()) { |
| QueueNode* queue_node = queue.head(); |
| DCHECK(queue_node != nullptr); |
| // Find a group that can admit the query |
| bool is_cancelled = queue_node->admit_outcome->IsSet() |
| && queue_node->admit_outcome->Get() == AdmissionOutcome::CANCELLED; |
| |
| bool is_rejected = !is_cancelled |
| && !FindGroupToAdmitOrReject(cluster_size, membership_snapshot, pool_config, |
| /* admit_from_queue=*/true, stats, queue_node); |
| |
| if (!is_cancelled && !is_rejected |
| && queue_node->admitted_schedule.get() == nullptr) { |
| // If no group was found, stop trying to dequeue. |
| // TODO(IMPALA-2968): Requests further in the queue may be blocked |
| // unnecessarily. Consider a better policy once we have better test scenarios. |
| LogDequeueFailed(queue_node, queue_node->not_admitted_reason); |
| break; |
| } |
| |
| // At this point we know that the query must be taken off the queue |
| queue.Dequeue(); |
| --max_to_dequeue; |
| VLOG(3) << "Dequeueing from stats for pool " << pool_name; |
| stats->Dequeue(false); |
| |
| if (is_rejected) { |
| AdmissionOutcome outcome = |
| queue_node->admit_outcome->Set(AdmissionOutcome::REJECTED); |
| if (outcome == AdmissionOutcome::REJECTED) { |
| stats->metrics()->total_rejected->Increment(1); |
| continue; // next query |
| } else { |
| DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED); |
| is_cancelled = true; |
| } |
| } |
| DCHECK(is_cancelled || queue_node->admitted_schedule != nullptr); |
| |
| const TUniqueId& query_id = queue_node->admission_request.query_id; |
| if (!is_cancelled) { |
| VLOG_QUERY << "Admitting from queue: query=" << PrintId(query_id); |
| AdmissionOutcome outcome = |
| queue_node->admit_outcome->Set(AdmissionOutcome::ADMITTED); |
| if (outcome != AdmissionOutcome::ADMITTED) { |
| DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED); |
| is_cancelled = true; |
| } |
| } |
| |
| if (is_cancelled) { |
| VLOG_QUERY << "Dequeued cancelled query=" << PrintId(query_id); |
| continue; // next query |
| } |
| |
| DCHECK(queue_node->admit_outcome->IsSet()); |
| DCHECK_ENUM_EQ(queue_node->admit_outcome->Get(), AdmissionOutcome::ADMITTED); |
| DCHECK(!is_cancelled); |
| DCHECK(!is_rejected); |
| DCHECK(queue_node->admitted_schedule != nullptr); |
| AdmitQuery(queue_node->admitted_schedule.get(), true); |
| } |
| pools_for_updates_.insert(pool_name); |
| } |
| } |
| } |
| |
| int64_t AdmissionController::GetQueueTimeoutForPoolMs(const TPoolConfig& pool_config) { |
| int64_t queue_wait_timeout_ms = pool_config.__isset.queue_timeout_ms ? |
| pool_config.queue_timeout_ms : |
| FLAGS_queue_wait_timeout_ms; |
| return max<int64_t>(0, queue_wait_timeout_ms); |
| } |
| |
| int64_t AdmissionController::GetMaxToDequeue(RequestQueue& queue, PoolStats* stats, |
| const TPoolConfig& pool_config, int64_t cluster_size) { |
| if (PoolLimitsRunningQueriesCount(pool_config)) { |
| const int64_t max_requests = GetMaxRequestsForPool(pool_config, cluster_size); |
| const int64_t total_available = max_requests - stats->agg_num_running(); |
| if (total_available <= 0) { |
| // There is a limit for the number of running queries, so we can |
| // see that nothing can run in this pool. |
| // This can happen in the case of over-admission. |
| if (!queue.empty()) { |
| LogDequeueFailed(queue.head(), |
| Substitute(QUEUED_NUM_RUNNING, stats->agg_num_running(), max_requests, |
| GetMaxRequestsForPoolDescription(pool_config, cluster_size), |
| GetStalenessDetailLocked(" "))); |
| } |
| return 0; |
| } |
| |
| // Use the ratio of locally queued requests to agg queued so that each impalad |
| // can dequeue a proportional amount total_available. Note, this offers no |
| // fairness between impalads. |
| double queue_size_ratio = static_cast<double>(stats->local_stats().num_queued) |
| / static_cast<double>(max<int64_t>(1, stats->agg_num_queued())); |
| DCHECK(queue_size_ratio <= 1.0); |
| return min(stats->local_stats().num_queued, |
| max<int64_t>(1, queue_size_ratio * total_available)); |
| } else { |
| return stats->local_stats().num_queued; // No limit on num running requests |
| } |
| } |
| |
| void AdmissionController::LogDequeueFailed(QueueNode* node, |
| const string& not_admitted_reason) { |
| VLOG_QUERY << "Could not dequeue query id=" << PrintId(node->admission_request.query_id) |
| << " reason: " << not_admitted_reason; |
| node->admission_request.summary_profile->AddInfoString( |
| PROFILE_INFO_KEY_LAST_QUEUED_REASON, not_admitted_reason); |
| } |
| |
| AdmissionController::PoolStats* AdmissionController::GetPoolStats( |
| const QuerySchedule& schedule) { |
| DCHECK(!schedule.request_pool().empty()); |
| return GetPoolStats(schedule.request_pool()); |
| } |
| |
| AdmissionController::PoolStats* AdmissionController::GetPoolStats( |
| const string& pool_name, bool dcheck_exists) { |
| DCHECK(!pool_name.empty()); |
| auto it = pool_stats_.find(pool_name); |
| DCHECK(!dcheck_exists || it != pool_stats_.end()); |
| if (it == pool_stats_.end()) { |
| bool inserted; |
| std::tie(it, inserted) = pool_stats_.emplace(pool_name, PoolStats(this, pool_name)); |
| DCHECK(inserted); |
| } |
| DCHECK(it != pool_stats_.end()); |
| return &it->second; |
| } |
| |
| void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) { |
| VLOG_RPC << "For Query " << PrintId(schedule->query_id()) |
| << " per_backend_mem_limit set to: " |
| << PrintBytes(schedule->per_backend_mem_limit()) |
| << " per_backend_mem_to_admit set to: " |
| << PrintBytes(schedule->per_backend_mem_to_admit()) |
| << " coord_backend_mem_limit set to: " |
| << PrintBytes(schedule->coord_backend_mem_limit()) |
| << " coord_backend_mem_to_admit set to: " |
| << PrintBytes(schedule->coord_backend_mem_to_admit());; |
| // Update memory and number of queries. |
| UpdateStatsOnAdmission(*schedule); |
| UpdateExecGroupMetric(schedule->executor_group(), 1); |
| // Update summary profile. |
| const string& admission_result = |
| was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY; |
| schedule->summary_profile()->AddInfoString( |
| PROFILE_INFO_KEY_ADMISSION_RESULT, admission_result); |
| schedule->summary_profile()->AddInfoString( |
| PROFILE_INFO_KEY_ADMITTED_MEM, PrintBytes(schedule->GetClusterMemoryToAdmit())); |
| schedule->summary_profile()->AddInfoString( |
| PROFILE_INFO_KEY_EXECUTOR_GROUP, schedule->executor_group()); |
| // We may have admitted based on stale information. Include a warning in the profile |
| // if this this may be the case. |
| int64_t time_since_update_ms; |
| string staleness_detail = GetStalenessDetailLocked("", &time_since_update_ms); |
| // IMPALA-8235: convert to TIME_NS because of issues with tools consuming TIME_MS. |
| COUNTER_SET(ADD_COUNTER(schedule->summary_profile(), |
| PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME, TUnit::TIME_NS), |
| static_cast<int64_t>(time_since_update_ms * NANOS_PER_MICRO * MICROS_PER_MILLI)); |
| if (!staleness_detail.empty()) { |
| schedule->summary_profile()->AddInfoString( |
| PROFILE_INFO_KEY_STALENESS_WARNING, staleness_detail); |
| } |
| DCHECK( |
| num_released_backends_.find(schedule->query_id()) == num_released_backends_.end()); |
| num_released_backends_[schedule->query_id()] = |
| schedule->per_backend_exec_params().size(); |
| } |
| |
| string AdmissionController::GetStalenessDetail(const string& prefix, |
| int64_t* ms_since_last_update) { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| return GetStalenessDetailLocked(prefix, ms_since_last_update); |
| } |
| |
| string AdmissionController::GetStalenessDetailLocked(const string& prefix, |
| int64_t* ms_since_last_update) { |
| int64_t ms_since_update = MonotonicMillis() - last_topic_update_time_ms_; |
| if (ms_since_last_update != nullptr) *ms_since_last_update = ms_since_update; |
| if (last_topic_update_time_ms_ == 0) { |
| return Substitute("$0Warning: admission control information from statestore " |
| "is stale: no update has been received.", prefix); |
| } else if (ms_since_update >= FLAGS_admission_control_stale_topic_threshold_ms) { |
| return Substitute("$0Warning: admission control information from statestore " |
| "is stale: $1 since last update was received.", |
| prefix, PrettyPrinter::Print(ms_since_update, TUnit::TIME_MS)); |
| } |
| return ""; |
| } |
| |
| void AdmissionController::PoolToJsonLocked(const string& pool_name, |
| rapidjson::Value* resource_pools, rapidjson::Document* document) { |
| auto it = pool_stats_.find(pool_name); |
| if (it == pool_stats_.end()) return; |
| PoolStats* stats = &it->second; |
| RequestQueue& queue = request_queue_map_[pool_name]; |
| // Get the pool stats |
| using namespace rapidjson; |
| Value pool_info_json(kObjectType); |
| stats->ToJson(&pool_info_json, document); |
| |
| // Get the queued queries |
| Value queued_queries(kArrayType); |
| queue.Iterate([&queued_queries, document](QueueNode* node) { |
| if (node->group_schedules.empty()) { |
| Value query_info(kObjectType); |
| query_info.AddMember("query_id", "N/A", document->GetAllocator()); |
| query_info.AddMember("mem_limit", 0, document->GetAllocator()); |
| query_info.AddMember("mem_limit_to_admit", 0, document->GetAllocator()); |
| query_info.AddMember("num_backends", 0, document->GetAllocator()); |
| queued_queries.PushBack(query_info, document->GetAllocator()); |
| return true; |
| } |
| QuerySchedule* schedule = node->group_schedules.begin()->schedule.get(); |
| Value query_info(kObjectType); |
| Value query_id(PrintId(schedule->query_id()).c_str(), document->GetAllocator()); |
| query_info.AddMember("query_id", query_id, document->GetAllocator()); |
| query_info.AddMember( |
| "mem_limit", schedule->per_backend_mem_limit(), document->GetAllocator()); |
| query_info.AddMember("mem_limit_to_admit", schedule->per_backend_mem_to_admit(), |
| document->GetAllocator()); |
| query_info.AddMember("coord_mem_limit", schedule->coord_backend_mem_limit(), |
| document->GetAllocator()); |
| query_info.AddMember("coord_mem_to_admit", |
| schedule->coord_backend_mem_to_admit(), document->GetAllocator()); |
| query_info.AddMember("num_backends", schedule->per_backend_exec_params().size(), |
| document->GetAllocator()); |
| queued_queries.PushBack(query_info, document->GetAllocator()); |
| return true; |
| }); |
| pool_info_json.AddMember("queued_queries", queued_queries, document->GetAllocator()); |
| |
| // Get the queued reason for the query at the head of the queue. |
| if (!queue.empty()) { |
| Value head_queued_reason( |
| queue.head() |
| ->profile->GetInfoString(PROFILE_INFO_KEY_LAST_QUEUED_REASON) |
| ->c_str(), |
| document->GetAllocator()); |
| pool_info_json.AddMember( |
| "head_queued_reason", head_queued_reason, document->GetAllocator()); |
| } |
| |
| resource_pools->PushBack(pool_info_json, document->GetAllocator()); |
| } |
| |
| void AdmissionController::PoolToJson(const string& pool_name, |
| rapidjson::Value* resource_pools, rapidjson::Document* document) { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| PoolToJsonLocked(pool_name, resource_pools, document); |
| } |
| |
| void AdmissionController::AllPoolsToJson( |
| rapidjson::Value* resource_pools, rapidjson::Document* document) { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| for (const PoolConfigMap::value_type& entry : pool_config_map_) { |
| const string& pool_name = entry.first; |
| PoolToJsonLocked(pool_name, resource_pools, document); |
| } |
| } |
| |
| void AdmissionController::PoolStats::UpdateWaitTime(int64_t wait_time_ms) { |
| metrics()->time_in_queue_ms->Increment(wait_time_ms); |
| if (wait_time_ms_ema_ == 0) { |
| wait_time_ms_ema_ = wait_time_ms; |
| return; |
| } |
| wait_time_ms_ema_ = |
| wait_time_ms_ema_ * (1 - EMA_MULTIPLIER) + wait_time_ms * EMA_MULTIPLIER; |
| } |
| |
| void AdmissionController::PoolStats::ToJson( |
| rapidjson::Value* pool, rapidjson::Document* document) const { |
| using namespace rapidjson; |
| Value pool_name(name_.c_str(), document->GetAllocator()); |
| pool->AddMember("pool_name", pool_name, document->GetAllocator()); |
| pool->AddMember( |
| "agg_num_running", metrics_.agg_num_running->GetValue(), document->GetAllocator()); |
| pool->AddMember( |
| "agg_num_queued", metrics_.agg_num_queued->GetValue(), document->GetAllocator()); |
| pool->AddMember("agg_mem_reserved", metrics_.agg_mem_reserved->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember("local_mem_admitted", metrics_.local_mem_admitted->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember( |
| "total_admitted", metrics_.total_admitted->GetValue(), document->GetAllocator()); |
| pool->AddMember( |
| "total_rejected", metrics_.total_rejected->GetValue(), document->GetAllocator()); |
| pool->AddMember( |
| "total_timed_out", metrics_.total_timed_out->GetValue(), document->GetAllocator()); |
| pool->AddMember("pool_max_mem_resources", metrics_.pool_max_mem_resources->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember("pool_max_requests", metrics_.pool_max_requests->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember( |
| "pool_max_queued", metrics_.pool_max_queued->GetValue(), document->GetAllocator()); |
| pool->AddMember("pool_queue_timeout", metrics_.pool_queue_timeout->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember("max_query_mem_limit", metrics_.max_query_mem_limit->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember("min_query_mem_limit", metrics_.min_query_mem_limit->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember("clamp_mem_limit_query_option", |
| metrics_.clamp_mem_limit_query_option->GetValue(), document->GetAllocator()); |
| pool->AddMember("max_running_queries_multiple", |
| metrics_.max_running_queries_multiple->GetValue(), document->GetAllocator()); |
| pool->AddMember("max_queued_queries_multiple", |
| metrics_.max_queued_queries_multiple->GetValue(), document->GetAllocator()); |
| pool->AddMember("max_memory_multiple", metrics_.max_memory_multiple->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember("max_running_queries_derived", |
| metrics_.max_running_queries_derived->GetValue(), document->GetAllocator()); |
| pool->AddMember("max_queued_queries_derived", |
| metrics_.max_queued_queries_derived->GetValue(), document->GetAllocator()); |
| pool->AddMember("max_memory_derived", metrics_.max_memory_derived->GetValue(), |
| document->GetAllocator()); |
| pool->AddMember("wait_time_ms_ema", wait_time_ms_ema_, document->GetAllocator()); |
| Value histogram(kArrayType); |
| for (int bucket = 0; bucket < peak_mem_histogram_.size(); bucket++) { |
| Value histogram_elem(kArrayType); |
| histogram_elem.PushBack(bucket, document->GetAllocator()); |
| histogram_elem.PushBack(peak_mem_histogram_[bucket], document->GetAllocator()); |
| histogram.PushBack(histogram_elem, document->GetAllocator()); |
| } |
| pool->AddMember("peak_mem_usage_histogram", histogram, document->GetAllocator()); |
| } |
| |
| void AdmissionController::ResetPoolInformationalStats(const string& pool_name) { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| auto it = pool_stats_.find(pool_name); |
| if(it == pool_stats_.end()) return; |
| it->second.ResetInformationalStats(); |
| } |
| |
| void AdmissionController::ResetAllPoolInformationalStats() { |
| lock_guard<mutex> lock(admission_ctrl_lock_); |
| for (auto& it: pool_stats_) it.second.ResetInformationalStats(); |
| } |
| |
| void AdmissionController::PoolStats::ResetInformationalStats() { |
| std::fill(peak_mem_histogram_.begin(), peak_mem_histogram_.end(), 0); |
| wait_time_ms_ema_ = 0.0; |
| // Reset only metrics keeping track of totals since last reset. |
| metrics()->total_admitted->SetValue(0); |
| metrics()->total_rejected->SetValue(0); |
| metrics()->total_queued->SetValue(0); |
| metrics()->total_dequeued->SetValue(0); |
| metrics()->total_timed_out->SetValue(0); |
| metrics()->total_released->SetValue(0); |
| metrics()->time_in_queue_ms->SetValue(0); |
| } |
| |
| void AdmissionController::PoolStats::InitMetrics() { |
| metrics_.total_admitted = parent_->metrics_group_->AddCounter( |
| TOTAL_ADMITTED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.total_queued = parent_->metrics_group_->AddCounter( |
| TOTAL_QUEUED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.total_dequeued = parent_->metrics_group_->AddCounter( |
| TOTAL_DEQUEUED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.total_rejected = parent_->metrics_group_->AddCounter( |
| TOTAL_REJECTED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.total_timed_out = parent_->metrics_group_->AddCounter( |
| TOTAL_TIMED_OUT_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.total_released = parent_->metrics_group_->AddCounter( |
| TOTAL_RELEASED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.time_in_queue_ms = parent_->metrics_group_->AddCounter( |
| TIME_IN_QUEUE_METRIC_KEY_FORMAT, 0, name_); |
| |
| metrics_.agg_num_running = parent_->metrics_group_->AddGauge( |
| AGG_NUM_RUNNING_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.agg_num_queued = parent_->metrics_group_->AddGauge( |
| AGG_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.agg_mem_reserved = parent_->metrics_group_->AddGauge( |
| AGG_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_); |
| |
| metrics_.local_mem_admitted = parent_->metrics_group_->AddGauge( |
| LOCAL_MEM_ADMITTED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.local_num_admitted_running = parent_->metrics_group_->AddGauge( |
| LOCAL_NUM_ADMITTED_RUNNING_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.local_num_queued = parent_->metrics_group_->AddGauge( |
| LOCAL_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.local_backend_mem_usage = parent_->metrics_group_->AddGauge( |
| LOCAL_BACKEND_MEM_USAGE_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.local_backend_mem_reserved = parent_->metrics_group_->AddGauge( |
| LOCAL_BACKEND_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_); |
| |
| metrics_.pool_max_mem_resources = parent_->metrics_group_->AddGauge( |
| POOL_MAX_MEM_RESOURCES_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.pool_max_requests = parent_->metrics_group_->AddGauge( |
| POOL_MAX_REQUESTS_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.pool_max_queued = parent_->metrics_group_->AddGauge( |
| POOL_MAX_QUEUED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.pool_queue_timeout = parent_->metrics_group_->AddGauge( |
| POOL_QUEUE_TIMEOUT_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.max_query_mem_limit = parent_->metrics_group_->AddGauge( |
| POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.min_query_mem_limit = parent_->metrics_group_->AddGauge( |
| POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.clamp_mem_limit_query_option = parent_->metrics_group_->AddProperty<bool>( |
| POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT, false, name_); |
| metrics_.max_running_queries_multiple = parent_->metrics_group_->AddDoubleGauge( |
| POOL_MAX_RUNNING_QUERIES_MULTIPLE_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.max_queued_queries_multiple = parent_->metrics_group_->AddDoubleGauge( |
| POOL_MAX_QUEUED_QUERIES_MULTIPLE_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.max_memory_multiple = parent_->metrics_group_->AddGauge( |
| POOL_MAX_MEMORY_MULTIPLE_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.max_running_queries_derived = parent_->metrics_group_->AddGauge( |
| POOL_MAX_RUNNING_QUERIES_DERIVED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.max_queued_queries_derived = parent_->metrics_group_->AddGauge( |
| POOL_MAX_QUEUED_QUERIES_DERIVED_METRIC_KEY_FORMAT, 0, name_); |
| metrics_.max_memory_derived = parent_->metrics_group_->AddGauge( |
| POOL_MAX_MEMORY_DERIVED_METRIC_KEY_FORMAT, 0, name_); |
| } |
| |
| void AdmissionController::PopulatePerHostMemReservedAndAdmitted( |
| PerHostStats* per_host_stats) { |
| lock_guard<mutex> l(admission_ctrl_lock_); |
| *per_host_stats = host_stats_; |
| } |
| |
| string AdmissionController::MakePoolTopicKey( |
| const string& pool_name, const string& backend_id) { |
| // Ensure the backend_id does not contain the delimiter to ensure that the topic key |
| // can be parsed properly by finding the last instance of the delimiter. |
| DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos); |
| return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id); |
| } |
| |
| void AdmissionController::GetExecutorGroupsForPool( |
| const ClusterMembershipMgr::ExecutorGroups& all_groups, const string& pool_name, |
| vector<const ExecutorGroup*>* matching_groups) { |
| string prefix(pool_name + POOL_GROUP_DELIMITER); |
| // We search for matching groups before the health check so that we don't fall back to |
| // the default group in case there are matching but unhealthy groups. |
| for (const auto& it : all_groups) { |
| StringPiece name(it.first); |
| if (name.starts_with(prefix)) matching_groups->push_back(&it.second); |
| } |
| if (matching_groups->empty()) { |
| auto default_it = all_groups.find(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME); |
| if (default_it == all_groups.end()) return; |
| VLOG(3) << "Checking default executor group for pool " << pool_name; |
| matching_groups->push_back(&default_it->second); |
| } |
| // Filter out unhealthy groups. |
| auto erase_from = std::remove_if(matching_groups->begin(), matching_groups->end(), |
| [](const ExecutorGroup* g) { return !g->IsHealthy(); }); |
| matching_groups->erase(erase_from, matching_groups->end()); |
| // Sort executor groups by name. |
| auto cmp = [](const ExecutorGroup* a, const ExecutorGroup* b) { |
| return a->name() < b->name(); |
| }; |
| sort(matching_groups->begin(), matching_groups->end(), cmp); |
| } |
| |
| int64_t AdmissionController::GetClusterSize( |
| const ClusterMembershipMgr::Snapshot& membership_snapshot) { |
| int64_t sum = 0; |
| for (const auto& it : membership_snapshot.executor_groups) { |
| sum += it.second.NumExecutors(); |
| } |
| return sum; |
| } |
| |
| int64_t AdmissionController::GetExecutorGroupSize( |
| const ClusterMembershipMgr::Snapshot& membership_snapshot, |
| const string& group_name) { |
| auto it = membership_snapshot.executor_groups.find(group_name); |
| DCHECK(it != membership_snapshot.executor_groups.end()) |
| << "Could not find group " << group_name; |
| if (it == membership_snapshot.executor_groups.end()) return 0; |
| return it->second.NumExecutors(); |
| } |
| |
| int64_t AdmissionController::GetMaxMemForPool( |
| const TPoolConfig& pool_config, int64_t cluster_size) { |
| if (pool_config.max_memory_multiple > 0) { |
| return pool_config.max_memory_multiple * cluster_size; |
| } |
| return pool_config.max_mem_resources; |
| } |
| |
| string AdmissionController::GetMaxMemForPoolDescription( |
| const TPoolConfig& pool_config, int64_t cluster_size) { |
| if (pool_config.max_memory_multiple > 0) { |
| return Substitute("calculated as $0 backends each with $1", cluster_size, |
| PrintBytes(pool_config.max_memory_multiple)); |
| } |
| return "configured statically"; |
| } |
| |
| int64_t AdmissionController::GetMaxRequestsForPool( |
| const TPoolConfig& pool_config, int64_t cluster_size) { |
| if (pool_config.max_running_queries_multiple > 0) { |
| return ceil(pool_config.max_running_queries_multiple * cluster_size); |
| } |
| return pool_config.max_requests; |
| } |
| |
| string AdmissionController::GetMaxRequestsForPoolDescription( |
| const TPoolConfig& pool_config, int64_t cluster_size) { |
| if (pool_config.max_running_queries_multiple > 0) { |
| return Substitute("calculated as $0 backends each with $1 queries", cluster_size, |
| pool_config.max_running_queries_multiple); |
| } |
| return "configured statically"; |
| } |
| |
| int64_t AdmissionController::GetMaxQueuedForPool( |
| const TPoolConfig& pool_config, int64_t cluster_size) { |
| if (pool_config.max_queued_queries_multiple > 0) { |
| return ceil(pool_config.max_queued_queries_multiple * cluster_size); |
| } |
| return pool_config.max_queued; |
| } |
| |
| string AdmissionController::GetMaxQueuedForPoolDescription( |
| const TPoolConfig& pool_config, int64_t cluster_size) { |
| if (pool_config.max_queued_queries_multiple > 0) { |
| return Substitute("calculated as $0 backends each with $1 queries", cluster_size, |
| pool_config.max_queued_queries_multiple); |
| } |
| return "configured statically"; |
| } |
| |
| bool AdmissionController::PoolDisabled(const TPoolConfig& pool_config) { |
| return ((pool_config.max_requests == 0 && pool_config.max_running_queries_multiple == 0) |
| || (pool_config.max_mem_resources == 0 && pool_config.max_memory_multiple == 0)); |
| } |
| |
| bool AdmissionController::PoolLimitsRunningQueriesCount(const TPoolConfig& pool_config) { |
| return pool_config.max_requests > 0 || pool_config.max_running_queries_multiple > 0; |
| } |
| |
| bool AdmissionController::PoolHasFixedMemoryLimit(const TPoolConfig& pool_config) { |
| return pool_config.max_mem_resources > 0 && pool_config.max_memory_multiple <= 0; |
| } |
| |
| int64_t AdmissionController::GetMemToAdmit( |
| const QuerySchedule& schedule, const BackendExecParams& backend_exec_params) { |
| return backend_exec_params.is_coord_backend ? schedule.coord_backend_mem_to_admit() : |
| schedule.per_backend_mem_to_admit(); |
| } |
| |
| void AdmissionController::UpdateExecGroupMetricMap( |
| ClusterMembershipMgr::SnapshotPtr snapshot) { |
| std::unordered_set<string> grp_names; |
| for (const auto& group : snapshot->executor_groups) { |
| if (group.second.NumHosts() > 0) grp_names.insert(group.first); |
| } |
| lock_guard<mutex> l(admission_ctrl_lock_); |
| auto it = exec_group_query_load_map_.begin(); |
| while (it != exec_group_query_load_map_.end()) { |
| // Erase existing groups from the set so that only new ones are left. |
| if (grp_names.erase(it->first) == 0) { |
| // Existing group not in the set means it no longer exists. |
| string group_name = it->first; |
| it = exec_group_query_load_map_.erase(it); |
| metrics_group_->RemoveMetric(EXEC_GROUP_QUERY_LOAD_KEY_FORMAT, group_name); |
| } else { |
| ++it; |
| } |
| } |
| // Now only the new groups are remaining in the set, add a metric for them. |
| for (const string& new_grp : grp_names) { |
| // There might be lingering queries from when this group was active. |
| int64_t currently_running = 0; |
| auto new_grp_it = snapshot->executor_groups.find(new_grp); |
| DCHECK(new_grp_it != snapshot->executor_groups.end()); |
| ExecutorGroup group = new_grp_it->second; |
| for (const TBackendDescriptor& be_desc : group.GetAllExecutorDescriptors()) { |
| const string& host = TNetworkAddressToString(be_desc.address); |
| auto stats = host_stats_.find(host); |
| if (stats != host_stats_.end()) { |
| currently_running = std::max(currently_running, stats->second.num_admitted); |
| } |
| } |
| exec_group_query_load_map_[new_grp] = metrics_group_->AddGauge( |
| EXEC_GROUP_QUERY_LOAD_KEY_FORMAT, currently_running, new_grp); |
| } |
| } |
| |
| void AdmissionController::UpdateExecGroupMetric( |
| const string& grp_name, int64_t delta) { |
| auto entry = exec_group_query_load_map_.find(grp_name); |
| if (entry != exec_group_query_load_map_.end()) entry->second->Increment(delta); |
| } |
| |
| } // namespace impala |