blob: 798abb4ab0eded264e558c5c6b2635f859e3b32c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduling/admission-controller.h"
#include <boost/algorithm/string.hpp>
#include <boost/mem_fn.hpp>
#include <gutil/strings/stringpiece.h>
#include <gutil/strings/substitute.h>
#include "common/logging.h"
#include "runtime/bufferpool/reservation-util.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/executor-group.h"
#include "scheduling/query-schedule.h"
#include "scheduling/scheduler.h"
#include "service/impala-server.h"
#include "util/bit-util.h"
#include "util/debug-util.h"
#include "util/metrics.h"
#include "util/pretty-printer.h"
#include "util/runtime-profile-counters.h"
#include "util/time.h"
#include "common/names.h"
using namespace strings;
DEFINE_int64(queue_wait_timeout_ms, 60 * 1000, "Maximum amount of time (in "
"milliseconds) that a request will wait to be admitted before timing out.");
// The stale topic warning threshold is made configurable to allow suppressing the
// error if it turns out to be noisy on some deployments or allow lowering the
// threshold to help debug admission control issues. Hidden so that we have the
// option of making this a no-op later.
DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
"Threshold above which the admission controller will append warnings to "
"error messages and profiles warning that the admission control topic is "
"stale so that the admission control decision may have been based on stale "
"state data. The default, 5 seconds, is chosen to minimise false positives but "
"capture most cases where the Impala daemon is disconnected from the statestore "
"or topic updates are seriously delayed.");
namespace impala {
const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
const int64_t AdmissionController::PoolStats::HISTOGRAM_BIN_SIZE = 1024L * 1024L * 1024L;
const double AdmissionController::PoolStats::EMA_MULTIPLIER = 0.2;
/// Convenience method.
string PrintBytes(int64_t value) {
return PrettyPrinter::Print(value, TUnit::BYTES);
}
// Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>".
// "!" is used because the backend id contains a colon, but it should not contain "!".
// When parsing the topic key we need to be careful to find the last instance in
// case the pool name contains it as well.
const char TOPIC_KEY_DELIMITER = '!';
// Delimiter used for the resource pool prefix of executor groups. In order to be used for
// queries in "resource-pool-A", an executor group name must start with
// "resource-pool-A-".
const char POOL_GROUP_DELIMITER = '-';
const string EXEC_GROUP_QUERY_LOAD_KEY_FORMAT =
"admission-controller.executor-group.num-queries-executing.$0";
// Define metric key format strings for metrics in PoolMetrics
// '$0' is replaced with the pool name by strings::Substitute
const string TOTAL_ADMITTED_METRIC_KEY_FORMAT =
"admission-controller.total-admitted.$0";
const string TOTAL_QUEUED_METRIC_KEY_FORMAT =
"admission-controller.total-queued.$0";
const string TOTAL_DEQUEUED_METRIC_KEY_FORMAT =
"admission-controller.total-dequeued.$0";
const string TOTAL_REJECTED_METRIC_KEY_FORMAT =
"admission-controller.total-rejected.$0";
const string TOTAL_TIMED_OUT_METRIC_KEY_FORMAT =
"admission-controller.total-timed-out.$0";
const string TOTAL_RELEASED_METRIC_KEY_FORMAT =
"admission-controller.total-released.$0";
const string TIME_IN_QUEUE_METRIC_KEY_FORMAT =
"admission-controller.time-in-queue-ms.$0";
const string AGG_NUM_RUNNING_METRIC_KEY_FORMAT =
"admission-controller.agg-num-running.$0";
const string AGG_NUM_QUEUED_METRIC_KEY_FORMAT =
"admission-controller.agg-num-queued.$0";
const string AGG_MEM_RESERVED_METRIC_KEY_FORMAT =
"admission-controller.agg-mem-reserved.$0";
const string LOCAL_MEM_ADMITTED_METRIC_KEY_FORMAT =
"admission-controller.local-mem-admitted.$0";
const string LOCAL_NUM_ADMITTED_RUNNING_METRIC_KEY_FORMAT =
"admission-controller.local-num-admitted-running.$0";
const string LOCAL_NUM_QUEUED_METRIC_KEY_FORMAT =
"admission-controller.local-num-queued.$0";
const string LOCAL_BACKEND_MEM_USAGE_METRIC_KEY_FORMAT =
"admission-controller.local-backend-mem-usage.$0";
const string LOCAL_BACKEND_MEM_RESERVED_METRIC_KEY_FORMAT =
"admission-controller.local-backend-mem-reserved.$0";
const string POOL_MAX_MEM_RESOURCES_METRIC_KEY_FORMAT =
"admission-controller.pool-max-mem-resources.$0";
const string POOL_MAX_REQUESTS_METRIC_KEY_FORMAT =
"admission-controller.pool-max-requests.$0";
const string POOL_MAX_QUEUED_METRIC_KEY_FORMAT =
"admission-controller.pool-max-queued.$0";
const string POOL_QUEUE_TIMEOUT_METRIC_KEY_FORMAT =
"admission-controller.pool-queue-timeout.$0";
const string POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
"admission-controller.pool-max-query-mem-limit.$0";
const string POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
"admission-controller.pool-min-query-mem-limit.$0";
const string POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT =
"admission-controller.pool-clamp-mem-limit-query-option.$0";
const string POOL_MAX_RUNNING_QUERIES_MULTIPLE_METRIC_KEY_FORMAT =
"admission-controller.pool-max-running-queries-multiple.$0";
const string POOL_MAX_QUEUED_QUERIES_MULTIPLE_METRIC_KEY_FORMAT =
"admission-controller.pool-max-queued-queries-multiple.$0";
const string POOL_MAX_MEMORY_MULTIPLE_METRIC_KEY_FORMAT =
"admission-controller.pool-max-memory-multiple.$0";
const string POOL_MAX_RUNNING_QUERIES_DERIVED_METRIC_KEY_FORMAT =
"admission-controller.pool-max-running-queries-derived.$0";
const string POOL_MAX_QUEUED_QUERIES_DERIVED_METRIC_KEY_FORMAT =
"admission-controller.max-queued-queries-derived.$0";
const string POOL_MAX_MEMORY_DERIVED_METRIC_KEY_FORMAT =
"admission-controller.max-memory-derived.$0";
// Profile query events
const string QUERY_EVENT_SUBMIT_FOR_ADMISSION = "Submit for admission";
const string QUERY_EVENT_QUEUED = "Queued";
const string QUERY_EVENT_COMPLETED_ADMISSION = "Completed admission";
// Profile info strings
const string AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT = "Admission result";
const string AdmissionController::PROFILE_INFO_VAL_ADMIT_IMMEDIATELY =
"Admitted immediately";
const string AdmissionController::PROFILE_INFO_VAL_QUEUED = "Queued";
const string AdmissionController::PROFILE_INFO_VAL_CANCELLED_IN_QUEUE =
"Cancelled (queued)";
const string AdmissionController::PROFILE_INFO_VAL_ADMIT_QUEUED = "Admitted (queued)";
const string AdmissionController::PROFILE_INFO_VAL_REJECTED = "Rejected";
const string AdmissionController::PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)";
const string AdmissionController::PROFILE_INFO_KEY_INITIAL_QUEUE_REASON =
"Initial admission queue reason";
const string AdmissionController::PROFILE_INFO_VAL_INITIAL_QUEUE_REASON =
"waited $0 ms, reason: $1";
const string AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON =
"Latest admission queue reason";
const string AdmissionController::PROFILE_INFO_KEY_ADMITTED_MEM =
"Cluster Memory Admitted";
const string AdmissionController::PROFILE_INFO_KEY_EXECUTOR_GROUP = "Executor Group";
const string AdmissionController::PROFILE_INFO_KEY_STALENESS_WARNING =
"Admission control state staleness";
const string AdmissionController::PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME =
"AdmissionControlTimeSinceLastUpdate";
// Error status string details
const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_FIXED =
"Invalid pool config: the min_query_mem_limit $0 is greater than the "
"max_mem_resources $1 (configured statically)";
const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_MULTIPLE =
"The min_query_mem_limit $0 is greater than the current max_mem_resources $1 ($2); "
"queries will not be admitted until more executors are available.";
const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_LIMIT =
"Invalid pool config: the min_query_mem_limit is greater than the "
"max_query_mem_limit ($0 > $1)";
const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION =
"minimum memory reservation is greater than memory available to the query for buffer "
"reservations. Memory reservation needed given the current plan: $0. Adjust either "
"the mem_limit or the pool config (max-query-mem-limit, min-query-mem-limit) for the "
"query to allow the query memory limit to be at least $1. Note that changing the "
"mem_limit may also change the plan. See the query profile for more information "
"about the per-node memory requirements.";
const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
"minimum memory reservation on backend '$0' is greater than memory available to the "
"query for buffer reservations. Increase the buffer_pool_limit to $1. See the query "
"profile for more information about the per-node memory requirements.";
const string REASON_NOT_ENOUGH_SLOTS_ON_BACKEND =
"number of admission control slots needed ($0) on backend '$1' is greater than total "
"slots available $2. Reduce mt_dop to less than $2 to ensure that the query can "
"execute.";
const string REASON_MIN_RESERVATION_OVER_POOL_MEM =
"minimum memory reservation needed is greater than pool max mem resources. Pool "
"max mem resources: $0 ($1). Cluster-wide memory reservation needed: $2. Increase "
"the pool max mem resources. See the query profile for more information about the "
"per-node memory requirements.";
const string REASON_DISABLED_MAX_MEM_RESOURCES =
"disabled by pool max mem resources set to 0";
const string REASON_DISABLED_REQUESTS_LIMIT = "disabled by requests limit set to 0";
// $2 is the description of how the queue limit was calculated, $3 is the staleness
// detail.
const string REASON_QUEUE_FULL = "queue full, limit=$0 ($1), num_queued=$2.$3";
const string REASON_REQ_OVER_POOL_MEM =
"request memory needed $0 is greater than pool max mem resources $1 ($2).\n\n"
"Use the MEM_LIMIT query option to indicate how much memory is required per node. "
"The total memory needed is the per-node MEM_LIMIT times the number of nodes "
"executing the query. See the Admission Control documentation for more information.";
const string REASON_REQ_OVER_NODE_MEM =
"request memory needed $0 is greater than memory available for admission $1 "
"of $2.\n\nUse the MEM_LIMIT query option to indicate how much memory is required "
"per node.";
const string REASON_THREAD_RESERVATION_LIMIT_EXCEEDED =
"thread reservation on backend '$0' is greater than the THREAD_RESERVATION_LIMIT "
"query option value: $1 > $2.";
const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED =
"sum of thread reservations across all $0 backends is greater than the "
"THREAD_RESERVATION_AGGREGATE_LIMIT query option value: $1 > $2.";
// $0 is the error message returned by the scheduler.
const string REASON_SCHEDULER_ERROR = "Error during scheduling: $0";
const string REASON_LOCAL_BACKEND_NOT_STARTED = "Local backend has not started up yet.";
const string REASON_NO_EXECUTOR_GROUPS = "Waiting for executors to start. Only DDL "
"queries can currently run.";
// Queue decision details
// $0 = num running queries, $1 = num queries limit, $2 = num queries limit explanation,
// $3 = staleness detail
const string QUEUED_NUM_RUNNING =
"number of running queries $0 is at or over limit $1 ($2)$3.";
// $0 = queue size, $1 = staleness detail
const string QUEUED_QUEUE_NOT_EMPTY = "queue is not empty (size $0); queued queries are "
"executed first.$1";
// $0 = pool name, $1 = pool max memory, $2 = pool max memory explanation,
// $3 = pool mem needed, $4 = pool mem available, $5 = staleness detail
const string POOL_MEM_NOT_AVAILABLE =
"Not enough aggregate memory available in pool $0 "
"with max mem resources $1 ($2). Needed $3 but only $4 was available.$5";
// $0 = host name, $1 = host mem needed, $3 = host mem available, $4 = staleness detail
const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0. "
"Needed $1 but only $2 out of $3 was available.$4";
// $0 = host name, $1 = num admitted, $2 = max requests
const string HOST_SLOT_NOT_AVAILABLE = "Not enough admission control slots available on "
"host $0. Needed $1 slots but $2/$3 are already "
"in use.";
// Parses the pool name and backend_id from the topic key if it is valid.
// Returns true if the topic key is valid and pool_name and backend_id are set.
static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
string* backend_id) {
// Topic keys will look something like: poolname!hostname:22000
// The '!' delimiter should always be present, the pool name must be
// at least 1 character, and network address must be at least 3 characters (including
// ':' and if the hostname and port are each only 1 character). Then the topic_key must
// be at least 5 characters (1 + 1 + 3).
const int MIN_TOPIC_KEY_SIZE = 5;
if (topic_key.length() < MIN_TOPIC_KEY_SIZE) {
VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
return false;
}
size_t pos = topic_key.find_last_of(TOPIC_KEY_DELIMITER);
if (pos == string::npos || pos >= topic_key.size() - 1) {
VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
return false;
}
*pool_name = topic_key.substr(0, pos);
*backend_id = topic_key.substr(pos + 1);
return true;
}
// Return a debug string for the pool stats.
static string DebugPoolStats(const TPoolStats& stats) {
stringstream ss;
ss << "num_admitted_running=" << stats.num_admitted_running << ", ";
ss << "num_queued=" << stats.num_queued << ", ";
ss << "backend_mem_reserved=" << PrintBytes(stats.backend_mem_reserved);
return ss.str();
}
string AdmissionController::PoolStats::DebugString() const {
stringstream ss;
ss << "agg_num_running=" << agg_num_running_ << ", ";
ss << "agg_num_queued=" << agg_num_queued_ << ", ";
ss << "agg_mem_reserved=" << PrintBytes(agg_mem_reserved_) << ", ";
ss << " local_host(local_mem_admitted=" << PrintBytes(local_mem_admitted_) << ", ";
ss << DebugPoolStats(local_stats_) << ")";
return ss.str();
}
// TODO: do we need host_id_ to come from host_addr or can it just take the same id
// the Scheduler has (coming from the StatestoreSubscriber)?
AdmissionController::AdmissionController(ClusterMembershipMgr* cluster_membership_mgr,
StatestoreSubscriber* subscriber, RequestPoolService* request_pool_service,
MetricGroup* metrics, const TNetworkAddress& host_addr)
: cluster_membership_mgr_(cluster_membership_mgr),
subscriber_(subscriber),
request_pool_service_(request_pool_service),
metrics_group_(metrics->GetOrCreateChildGroup("admission-controller")),
host_id_(TNetworkAddressToString(host_addr)),
thrift_serializer_(false),
done_(false) {
cluster_membership_mgr_->RegisterUpdateCallbackFn(
[this](ClusterMembershipMgr::SnapshotPtr snapshot) {
this->UpdateExecGroupMetricMap(snapshot);
});
}
AdmissionController::~AdmissionController() {
// If the dequeue thread is not running (e.g. if Init() fails), then there is
// nothing to do.
if (dequeue_thread_ == nullptr) return;
// The AdmissionController should live for the lifetime of the impalad, but
// for unit tests we need to ensure that no thread is waiting on the
// condition variable. This notifies the dequeue thread to stop and waits
// for it to finish.
{
// Lock to ensure the dequeue thread will see the update to done_
lock_guard<mutex> l(admission_ctrl_lock_);
done_ = true;
pending_dequeue_ = true;
dequeue_cv_.NotifyOne();
}
dequeue_thread_->Join();
}
Status AdmissionController::Init() {
RETURN_IF_ERROR(Thread::Create("scheduling", "admission-thread",
&AdmissionController::DequeueLoop, this, &dequeue_thread_));
auto cb = [this](
const StatestoreSubscriber::TopicDeltaMap& state,
vector<TTopicDelta>* topic_updates) { UpdatePoolStats(state, topic_updates); };
Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC,
/* is_transient=*/true, /* populate_min_subscriber_topic_version=*/false,
/* filter_prefix=*/"", cb);
if (!status.ok()) {
status.AddDetail("AdmissionController failed to register request queue topic");
}
return status;
}
void AdmissionController::PoolStats::AdmitQueryAndMemory(const QuerySchedule& schedule) {
int64_t cluster_mem_admitted = schedule.GetClusterMemoryToAdmit();
DCHECK_GT(cluster_mem_admitted, 0);
local_mem_admitted_ += cluster_mem_admitted;
metrics_.local_mem_admitted->Increment(cluster_mem_admitted);
agg_num_running_ += 1;
metrics_.agg_num_running->Increment(1L);
local_stats_.num_admitted_running += 1;
metrics_.local_num_admitted_running->Increment(1L);
metrics_.total_admitted->Increment(1L);
}
void AdmissionController::PoolStats::ReleaseQuery(int64_t peak_mem_consumption) {
// Update stats tracking the number of running and admitted queries.
agg_num_running_ -= 1;
metrics_.agg_num_running->Increment(-1L);
local_stats_.num_admitted_running -= 1;
metrics_.local_num_admitted_running->Increment(-1L);
metrics_.total_released->Increment(1L);
DCHECK_GE(local_stats_.num_admitted_running, 0);
DCHECK_GE(agg_num_running_, 0);
// Update the 'peak_mem_histogram' based on the given peak memory consumption of the
// query.
int64_t histogram_bucket =
BitUtil::RoundUp(peak_mem_consumption, HISTOGRAM_BIN_SIZE) / HISTOGRAM_BIN_SIZE;
histogram_bucket = std::max(std::min(histogram_bucket, HISTOGRAM_NUM_OF_BINS), 1L) - 1;
peak_mem_histogram_[histogram_bucket] = ++(peak_mem_histogram_[histogram_bucket]);
}
void AdmissionController::PoolStats::ReleaseMem(int64_t mem_to_release) {
// Update stats tracking memory admitted.
DCHECK_GT(mem_to_release, 0);
local_mem_admitted_ -= mem_to_release;
DCHECK_GE(local_mem_admitted_, 0);
metrics_.local_mem_admitted->Increment(-mem_to_release);
}
void AdmissionController::PoolStats::Queue() {
agg_num_queued_ += 1;
metrics_.agg_num_queued->Increment(1L);
local_stats_.num_queued += 1;
metrics_.local_num_queued->Increment(1L);
metrics_.total_queued->Increment(1L);
}
void AdmissionController::PoolStats::Dequeue(bool timed_out) {
agg_num_queued_ -= 1;
metrics_.agg_num_queued->Increment(-1L);
local_stats_.num_queued -= 1;
metrics_.local_num_queued->Increment(-1L);
DCHECK_GE(agg_num_queued_, 0);
DCHECK_GE(local_stats_.num_queued, 0);
if (timed_out) {
metrics_.total_timed_out->Increment(1L);
} else {
metrics_.total_dequeued->Increment(1L);
}
}
void AdmissionController::UpdateStatsOnReleaseForBackends(
const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addrs) {
int64_t total_mem_to_release = 0;
for (auto host_addr : host_addrs) {
auto backend_exec_params = schedule.per_backend_exec_params().find(host_addr);
if (backend_exec_params == schedule.per_backend_exec_params().end()) {
string err_msg =
strings::Substitute("Error: Cannot find exec params of host $0 for query $1.",
PrintThrift(host_addr), PrintId(schedule.query_id()));
DCHECK(false) << err_msg;
LOG(ERROR) << err_msg;
continue;
}
int64_t mem_to_release = GetMemToAdmit(schedule, backend_exec_params->second);
UpdateHostStats(
host_addr, -mem_to_release, -1, -backend_exec_params->second.slots_to_use);
total_mem_to_release += mem_to_release;
}
PoolStats* pool_stats = GetPoolStats(schedule);
pool_stats->ReleaseMem(total_mem_to_release);
pools_for_updates_.insert(schedule.request_pool());
}
void AdmissionController::UpdateStatsOnAdmission(const QuerySchedule& schedule) {
for (const auto& entry : schedule.per_backend_exec_params()) {
const TNetworkAddress& host_addr = entry.first;
int64_t mem_to_admit = GetMemToAdmit(schedule, entry.second);
UpdateHostStats(host_addr, mem_to_admit, 1, entry.second.slots_to_use);
}
PoolStats* pool_stats = GetPoolStats(schedule);
pool_stats->AdmitQueryAndMemory(schedule);
pools_for_updates_.insert(schedule.request_pool());
}
void AdmissionController::UpdateHostStats(
const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit,
int num_slots_to_admit) {
const string host = TNetworkAddressToString(host_addr);
VLOG_ROW << "Update admitted mem reserved for host=" << host
<< " prev=" << PrintBytes(host_stats_[host].mem_admitted)
<< " new=" << PrintBytes(host_stats_[host].mem_admitted + mem_to_admit);
host_stats_[host].mem_admitted += mem_to_admit;
DCHECK_GE(host_stats_[host].mem_admitted, 0);
VLOG_ROW << "Update admitted queries for host=" << host
<< " prev=" << host_stats_[host].num_admitted
<< " new=" << host_stats_[host].num_admitted + num_queries_to_admit;
host_stats_[host].num_admitted += num_queries_to_admit;
DCHECK_GE(host_stats_[host].num_admitted, 0);
VLOG_ROW << "Update slots in use for host=" << host
<< " prev=" << host_stats_[host].slots_in_use
<< " new=" << host_stats_[host].slots_in_use + num_slots_to_admit;
host_stats_[host].slots_in_use += num_slots_to_admit;
DCHECK_GE(host_stats_[host].slots_in_use, 0);
}
// Helper method used by CanAccommodateMaxInitialReservation(). Returns true if the given
// 'mem_limit' can accommodate 'buffer_reservation'. If not, returns false and the
// details about the memory shortage in 'mem_unavailable_reason'.
static bool CanMemLimitAccommodateReservation(
int64_t mem_limit, int64_t buffer_reservation, string* mem_unavailable_reason) {
if (mem_limit <= 0) return true; // No mem limit.
const int64_t max_reservation =
ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
if (buffer_reservation <= max_reservation) return true;
const int64_t required_mem_limit =
ReservationUtil::GetMinMemLimitFromReservation(buffer_reservation);
*mem_unavailable_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
PrintBytes(buffer_reservation), PrintBytes(required_mem_limit));
return false;
}
bool AdmissionController::CanAccommodateMaxInitialReservation(
const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
string* mem_unavailable_reason) {
const int64_t executor_mem_limit = schedule.per_backend_mem_limit();
const int64_t executor_min_reservation = schedule.largest_min_reservation();
const int64_t coord_mem_limit = schedule.coord_backend_mem_limit();
const int64_t coord_min_reservation = schedule.coord_min_reservation();
return CanMemLimitAccommodateReservation(
executor_mem_limit, executor_min_reservation, mem_unavailable_reason)
&& CanMemLimitAccommodateReservation(
coord_mem_limit, coord_min_reservation, mem_unavailable_reason);
}
bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
const TPoolConfig& pool_cfg, int64_t cluster_size, string* mem_unavailable_reason) {
const string& pool_name = schedule.request_pool();
const int64_t pool_max_mem = GetMaxMemForPool(pool_cfg, cluster_size);
// If the pool doesn't have memory resources configured, always true.
if (pool_max_mem < 0) return true;
// Otherwise, two conditions must be met:
// 1) The memory estimated to be reserved by all queries in this pool *plus* the total
// memory needed for this query must be within the max pool memory resources
// specified.
// 2) Each individual backend must have enough mem available within its process limit
// to execute the query.
// Case 1:
PoolStats* stats = GetPoolStats(schedule);
int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
VLOG_RPC << "Checking agg mem in pool=" << pool_name << " : " << stats->DebugString()
<< " executor_group=" << schedule.executor_group()
<< " cluster_mem_needed=" << PrintBytes(cluster_mem_to_admit)
<< " pool_max_mem=" << PrintBytes(pool_max_mem) << " ("
<< GetMaxMemForPoolDescription(pool_cfg, cluster_size) << ")";
if (stats->EffectiveMemReserved() + cluster_mem_to_admit > pool_max_mem) {
*mem_unavailable_reason = Substitute(POOL_MEM_NOT_AVAILABLE, pool_name,
PrintBytes(pool_max_mem), GetMaxMemForPoolDescription(pool_cfg, cluster_size),
PrintBytes(cluster_mem_to_admit),
PrintBytes(max(pool_max_mem - stats->EffectiveMemReserved(), 0L)),
GetStalenessDetailLocked(" "));
return false;
}
// Case 2:
for (const auto& entry : schedule.per_backend_exec_params()) {
const TNetworkAddress& host = entry.first;
const string host_id = TNetworkAddressToString(host);
int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit;
const HostStats& host_stats = host_stats_[host_id];
int64_t mem_reserved = host_stats.mem_reserved;
int64_t mem_admitted = host_stats.mem_admitted;
int64_t mem_to_admit = GetMemToAdmit(schedule, entry.second);
VLOG_ROW << "Checking memory on host=" << host_id
<< " mem_reserved=" << PrintBytes(mem_reserved)
<< " mem_admitted=" << PrintBytes(mem_admitted)
<< " needs=" << PrintBytes(mem_to_admit)
<< " admit_mem_limit=" << PrintBytes(admit_mem_limit);
int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted);
if (effective_host_mem_reserved + mem_to_admit > admit_mem_limit) {
*mem_unavailable_reason =
Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(mem_to_admit),
PrintBytes(max(admit_mem_limit - effective_host_mem_reserved, 0L)),
PrintBytes(admit_mem_limit), GetStalenessDetailLocked(" "));
return false;
}
}
const TQueryOptions& query_opts = schedule.query_options();
if (!query_opts.__isset.buffer_pool_limit || query_opts.buffer_pool_limit <= 0) {
// Check if a change in pool_cfg.max_query_mem_limit (while the query was queued)
// resulted in a decrease in the computed per_host_mem_limit such that it can no
// longer accommodate the largest min_reservation.
return CanAccommodateMaxInitialReservation(
schedule, pool_cfg, mem_unavailable_reason);
}
return true;
}
bool AdmissionController::HasAvailableSlots(const QuerySchedule& schedule,
const TPoolConfig& pool_cfg, string* unavailable_reason) {
for (const auto& entry : schedule.per_backend_exec_params()) {
const TNetworkAddress& host = entry.first;
const string host_id = TNetworkAddressToString(host);
int64_t admission_slots = entry.second.be_desc.admission_slots;
int64_t slots_in_use = host_stats_[host_id].slots_in_use;
VLOG_ROW << "Checking available slot on host=" << host_id
<< " slots_in_use=" << slots_in_use
<< " needs=" << slots_in_use + entry.second.slots_to_use
<< " executor admission_slots=" << admission_slots;
if (slots_in_use + entry.second.slots_to_use > admission_slots) {
*unavailable_reason = Substitute(HOST_SLOT_NOT_AVAILABLE, host_id,
entry.second.slots_to_use, slots_in_use, admission_slots);
return false;
}
}
return true;
}
bool AdmissionController::CanAdmitRequest(const QuerySchedule& schedule,
const TPoolConfig& pool_cfg, int64_t cluster_size, bool admit_from_queue,
string* not_admitted_reason) {
// Can't admit if:
// (a) There are already queued requests (and this is not admitting from the queue).
// (b) The resource pool is already at the maximum number of requests.
// (c) One of the executors in 'schedule' is already at its maximum number of requests
// (when not using the default executor group).
// (d) There are not enough memory resources available for the query.
const int64_t max_requests = GetMaxRequestsForPool(pool_cfg, cluster_size);
PoolStats* pool_stats = GetPoolStats(schedule);
bool default_group =
schedule.executor_group() == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
if (!admit_from_queue && pool_stats->local_stats().num_queued > 0) {
*not_admitted_reason = Substitute(QUEUED_QUEUE_NOT_EMPTY,
pool_stats->local_stats().num_queued, GetStalenessDetailLocked(" "));
return false;
}
if (max_requests >= 0 && pool_stats->agg_num_running() >= max_requests) {
// All executor groups are limited by the aggregate number of queries running in the
// pool.
*not_admitted_reason = Substitute(QUEUED_NUM_RUNNING, pool_stats->agg_num_running(),
max_requests, GetMaxRequestsForPoolDescription(pool_cfg, cluster_size),
GetStalenessDetailLocked(" "));
return false;
}
if (!default_group && !HasAvailableSlots(schedule, pool_cfg, not_admitted_reason)) {
// All non-default executor groups are also limited by the number of running queries
// per executor.
// TODO(IMPALA-8757): Extend slot based admission to default executor group
return false;
}
if (!HasAvailableMemResources(schedule, pool_cfg, cluster_size, not_admitted_reason)) {
return false;
}
return true;
}
bool AdmissionController::RejectForCluster(const string& pool_name,
const TPoolConfig& pool_cfg, bool admit_from_queue, int64_t cluster_size,
string* rejection_reason) {
DCHECK(rejection_reason != nullptr && rejection_reason->empty());
// Checks related to pool max_requests:
if (GetMaxRequestsForPool(pool_cfg, cluster_size) == 0) {
*rejection_reason = REASON_DISABLED_REQUESTS_LIMIT;
return true;
}
// Checks related to pool max_mem_resources:
int64_t max_mem = GetMaxMemForPool(pool_cfg, cluster_size);
if (max_mem == 0) {
*rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
return true;
}
if (max_mem > 0 && pool_cfg.min_query_mem_limit > max_mem) {
if (PoolHasFixedMemoryLimit(pool_cfg)) {
*rejection_reason = Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_FIXED,
pool_cfg.min_query_mem_limit, max_mem);
} else {
*rejection_reason =
Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_MULTIPLE,
pool_cfg.min_query_mem_limit, max_mem,
GetMaxMemForPoolDescription(pool_cfg, cluster_size));
}
return true;
}
if (pool_cfg.max_query_mem_limit > 0
&& pool_cfg.min_query_mem_limit > pool_cfg.max_query_mem_limit) {
*rejection_reason = Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_LIMIT,
pool_cfg.min_query_mem_limit, pool_cfg.max_query_mem_limit);
return true;
}
PoolStats* stats = GetPoolStats(pool_name);
int64_t max_queued = GetMaxQueuedForPool(pool_cfg, cluster_size);
if (!admit_from_queue && stats->agg_num_queued() >= max_queued) {
*rejection_reason = Substitute(REASON_QUEUE_FULL, max_queued,
GetMaxQueuedForPoolDescription(pool_cfg, cluster_size), stats->agg_num_queued(),
GetStalenessDetailLocked(" "));
return true;
}
return false;
}
bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
const TPoolConfig& pool_cfg, int64_t cluster_size, int64_t group_size,
string* rejection_reason) {
DCHECK(rejection_reason != nullptr && rejection_reason->empty());
bool default_group =
schedule.executor_group() == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
// Compute the max (over all backends), the cluster totals (across all backends) for
// min_mem_reservation_bytes, thread_reservation, the min admit_mem_limit
// (over all executors) and the admit_mem_limit of the coordinator.
pair<const TNetworkAddress*, int64_t> largest_min_mem_reservation(nullptr, -1);
int64_t cluster_min_mem_reservation_bytes = 0;
pair<const TNetworkAddress*, int64_t> max_thread_reservation(nullptr, 0);
pair<const TNetworkAddress*, int64_t> min_executor_admit_mem_limit(
nullptr, std::numeric_limits<int64_t>::max());
pair<const TNetworkAddress*, int64_t> coord_admit_mem_limit(
nullptr, std::numeric_limits<int64_t>::max());
int64_t cluster_thread_reservation = 0;
for (const auto& e : schedule.per_backend_exec_params()) {
const BackendExecParams& bp = e.second;
// TODO(IMPALA-8757): Extend slot based admission to default executor group
if (!default_group && bp.slots_to_use > bp.be_desc.admission_slots) {
*rejection_reason = Substitute(REASON_NOT_ENOUGH_SLOTS_ON_BACKEND, bp.slots_to_use,
TNetworkAddressToString(bp.be_desc.address), bp.be_desc.admission_slots);
return true;
}
cluster_min_mem_reservation_bytes += bp.min_mem_reservation_bytes;
if (bp.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
largest_min_mem_reservation = make_pair(&e.first, bp.min_mem_reservation_bytes);
}
cluster_thread_reservation += bp.thread_reservation;
if (bp.thread_reservation > max_thread_reservation.second) {
max_thread_reservation = make_pair(&e.first, bp.thread_reservation);
}
if (bp.is_coord_backend) {
coord_admit_mem_limit.first = &e.first;
coord_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
} else if (bp.be_desc.admit_mem_limit < min_executor_admit_mem_limit.second) {
min_executor_admit_mem_limit.first = &e.first;
min_executor_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
}
}
// Checks related to the min buffer reservation against configured query memory limits:
const TQueryOptions& query_opts = schedule.query_options();
if (query_opts.__isset.buffer_pool_limit && query_opts.buffer_pool_limit > 0) {
if (largest_min_mem_reservation.second > query_opts.buffer_pool_limit) {
*rejection_reason = Substitute(REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION,
TNetworkAddressToString(*largest_min_mem_reservation.first),
PrintBytes(largest_min_mem_reservation.second));
return true;
}
} else if (!CanAccommodateMaxInitialReservation(schedule, pool_cfg, rejection_reason)) {
// If buffer_pool_limit is not explicitly set, it's calculated from mem_limit.
return true;
}
// Check thread reservation limits.
if (query_opts.__isset.thread_reservation_limit
&& query_opts.thread_reservation_limit > 0
&& max_thread_reservation.second > query_opts.thread_reservation_limit) {
*rejection_reason = Substitute(REASON_THREAD_RESERVATION_LIMIT_EXCEEDED,
TNetworkAddressToString(*max_thread_reservation.first),
max_thread_reservation.second, query_opts.thread_reservation_limit);
return true;
}
if (query_opts.__isset.thread_reservation_aggregate_limit
&& query_opts.thread_reservation_aggregate_limit > 0
&& cluster_thread_reservation > query_opts.thread_reservation_aggregate_limit) {
*rejection_reason = Substitute(REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED,
schedule.per_backend_exec_params().size(), cluster_thread_reservation,
query_opts.thread_reservation_aggregate_limit);
return true;
}
// Checks related to pool max_mem_resources:
// We perform these checks here against the group_size to prevent queuing up queries
// that would never be able to reserve the required memory on an executor group.
int64_t max_mem = GetMaxMemForPool(pool_cfg, group_size);
if (max_mem == 0) {
*rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
return true;
}
if (max_mem > 0) {
if (cluster_min_mem_reservation_bytes > max_mem) {
*rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM,
PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, group_size),
PrintBytes(cluster_min_mem_reservation_bytes));
return true;
}
int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
if (cluster_mem_to_admit > max_mem) {
*rejection_reason =
Substitute(REASON_REQ_OVER_POOL_MEM, PrintBytes(cluster_mem_to_admit),
PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, group_size));
return true;
}
int64_t executor_mem_to_admit = schedule.per_backend_mem_to_admit();
VLOG_ROW << "Checking executor mem with executor_mem_to_admit = "
<< executor_mem_to_admit
<< " and min_admit_mem_limit.second = "
<< min_executor_admit_mem_limit.second;
if (executor_mem_to_admit > min_executor_admit_mem_limit.second) {
*rejection_reason =
Substitute(REASON_REQ_OVER_NODE_MEM, PrintBytes(executor_mem_to_admit),
PrintBytes(min_executor_admit_mem_limit.second),
TNetworkAddressToString(*min_executor_admit_mem_limit.first));
return true;
}
int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
<< coord_mem_to_admit
<< " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
if (coord_mem_to_admit > coord_admit_mem_limit.second) {
*rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
TNetworkAddressToString(*coord_admit_mem_limit.first));
return true;
}
}
return false;
}
void AdmissionController::PoolStats::UpdateConfigMetrics(
const TPoolConfig& pool_cfg, int64_t cluster_size) {
metrics_.pool_max_mem_resources->SetValue(pool_cfg.max_mem_resources);
metrics_.pool_max_requests->SetValue(pool_cfg.max_requests);
metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
metrics_.pool_queue_timeout->SetValue(GetQueueTimeoutForPoolMs(pool_cfg));
metrics_.max_query_mem_limit->SetValue(pool_cfg.max_query_mem_limit);
metrics_.min_query_mem_limit->SetValue(pool_cfg.min_query_mem_limit);
metrics_.clamp_mem_limit_query_option->SetValue(pool_cfg.clamp_mem_limit_query_option);
metrics_.max_running_queries_multiple->SetValue(pool_cfg.max_running_queries_multiple);
metrics_.max_queued_queries_multiple->SetValue(pool_cfg.max_queued_queries_multiple);
metrics_.max_memory_multiple->SetValue(pool_cfg.max_memory_multiple);
}
void AdmissionController::PoolStats::UpdateDerivedMetrics(
const TPoolConfig& pool_cfg, int64_t cluster_size) {
metrics_.max_running_queries_derived->SetValue(
GetMaxRequestsForPool(pool_cfg, cluster_size));
metrics_.max_queued_queries_derived->SetValue(
GetMaxQueuedForPool(pool_cfg, cluster_size));
metrics_.max_memory_derived->SetValue(GetMaxMemForPool(pool_cfg, cluster_size));
}
Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
std::unique_ptr<QuerySchedule>* schedule_result) {
DCHECK(schedule_result->get() == nullptr);
ClusterMembershipMgr::SnapshotPtr membership_snapshot =
cluster_membership_mgr_->GetSnapshot();
DCHECK(membership_snapshot.get() != nullptr);
string blacklist_str = membership_snapshot->executor_blacklist.BlacklistToString();
if (!blacklist_str.empty()) {
request.summary_profile->AddInfoString("Blacklisted Executors", blacklist_str);
}
// Note the queue_node will not exist in the queue when this method returns.
QueueNode queue_node(request, admit_outcome, request.summary_profile);
// Re-resolve the pool name to propagate any resolution errors now that this request is
// known to require a valid pool. All executor groups / schedules will use the same pool
// name.
string pool_name;
TPoolConfig pool_cfg;
RETURN_IF_ERROR(
ResolvePoolAndGetConfig(request.request.query_ctx, &pool_name, &pool_cfg));
request.summary_profile->AddInfoString("Request Pool", pool_name);
const int64_t cluster_size = GetClusterSize(*membership_snapshot);
// We track this outside of the queue node so that it is still available after the query
// has been dequeued.
string initial_queue_reason;
ScopedEvent completedEvent(request.query_events, QUERY_EVENT_COMPLETED_ADMISSION);
{
// Take lock to ensure the Dequeue thread does not modify the request queue.
lock_guard<mutex> lock(admission_ctrl_lock_);
request.query_events->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
pool_config_map_[pool_name] = pool_cfg;
PoolStats* stats = GetPoolStats(pool_name);
stats->UpdateConfigMetrics(pool_cfg, cluster_size);
stats->UpdateDerivedMetrics(pool_cfg, cluster_size);
bool must_reject = !FindGroupToAdmitOrReject(cluster_size, membership_snapshot,
pool_cfg, /* admit_from_queue=*/false, stats, &queue_node);
if (must_reject) {
AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::REJECTED);
if (outcome != AdmissionOutcome::REJECTED) {
DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
VLOG_QUERY << "Ready to be " << PROFILE_INFO_VAL_REJECTED
<< " but already cancelled, query id=" << PrintId(request.query_id);
return Status::CANCELLED;
}
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
stats->metrics()->total_rejected->Increment(1);
const ErrorMsg& rejected_msg = ErrorMsg(
TErrorCode::ADMISSION_REJECTED, pool_name, queue_node.not_admitted_reason);
VLOG_QUERY << rejected_msg.msg();
return Status::Expected(rejected_msg);
}
if (queue_node.admitted_schedule.get() != nullptr) {
const string& group_name = queue_node.admitted_schedule->executor_group();
VLOG(3) << "Can admit to group " << group_name << " (or cancelled)";
DCHECK_EQ(stats->local_stats().num_queued, 0);
*schedule_result = std::move(queue_node.admitted_schedule);
AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::ADMITTED);
if (outcome != AdmissionOutcome::ADMITTED) {
DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
VLOG_QUERY << "Ready to be " << PROFILE_INFO_VAL_ADMIT_IMMEDIATELY
<< " but already cancelled, query id=" << PrintId(request.query_id);
return Status::CANCELLED;
}
VLOG_QUERY << "Admitting query id=" << PrintId(request.query_id);
AdmitQuery(schedule_result->get(), false);
stats->UpdateWaitTime(0);
VLOG_RPC << "Final: " << stats->DebugString();
return Status::OK();
}
// We cannot immediately admit but do not need to reject, so queue the request
RequestQueue* queue = &request_queue_map_[pool_name];
VLOG_QUERY << "Queuing, query id=" << PrintId(request.query_id)
<< " reason: " << queue_node.not_admitted_reason;
initial_queue_reason = queue_node.not_admitted_reason;
stats->Queue();
queue->Enqueue(&queue_node);
}
// Update the profile info before waiting. These properties will be updated with
// their final state after being dequeued.
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_QUEUED);
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, initial_queue_reason);
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_LAST_QUEUED_REASON, queue_node.not_admitted_reason);
request.query_events->MarkEvent(QUERY_EVENT_QUEUED);
int64_t queue_wait_timeout_ms = GetQueueTimeoutForPoolMs(pool_cfg);
int64_t wait_start_ms = MonotonicMillis();
// Block in Get() up to the time out, waiting for the promise to be set when the query
// is admitted or cancelled.
bool timed_out;
admit_outcome->Get(queue_wait_timeout_ms, &timed_out);
int64_t wait_time_ms = MonotonicMillis() - wait_start_ms;
request.summary_profile->AddInfoString(PROFILE_INFO_KEY_INITIAL_QUEUE_REASON,
Substitute(
PROFILE_INFO_VAL_INITIAL_QUEUE_REASON, wait_time_ms, initial_queue_reason));
// Disallow the FAIL action here. It would leave the queue in an inconsistent state.
DebugActionNoFail(request.query_options, "AC_AFTER_ADMISSION_OUTCOME");
{
lock_guard<mutex> lock(admission_ctrl_lock_);
// If the query has not been admitted or cancelled up till now, it will be considered
// to be timed out.
AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::TIMED_OUT);
RequestQueue* queue = &request_queue_map_[pool_name];
pools_for_updates_.insert(pool_name);
PoolStats* pool_stats = GetPoolStats(pool_name);
pool_stats->UpdateWaitTime(wait_time_ms);
if (outcome == AdmissionOutcome::REJECTED) {
if (queue->Remove(&queue_node)) pool_stats->Dequeue(true);
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
const ErrorMsg& rejected_msg = ErrorMsg(
TErrorCode::ADMISSION_REJECTED, pool_name, queue_node.not_admitted_reason);
VLOG_QUERY << rejected_msg.msg();
return Status::Expected(rejected_msg);
} else if (outcome == AdmissionOutcome::TIMED_OUT) {
bool removed = queue->Remove(&queue_node);
DCHECK(removed);
pool_stats->Dequeue(true);
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_TIME_OUT);
const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT,
queue_wait_timeout_ms, pool_name, queue_node.not_admitted_reason);
VLOG_QUERY << rejected_msg.msg();
return Status::Expected(rejected_msg);
} else if (outcome == AdmissionOutcome::CANCELLED) {
if (queue->Remove(&queue_node)) pool_stats->Dequeue(false);
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_CANCELLED_IN_QUEUE);
VLOG_QUERY << PROFILE_INFO_VAL_CANCELLED_IN_QUEUE
<< ", query id=" << PrintId(request.query_id);
return Status::CANCELLED;
}
// The dequeue thread updates the stats (to avoid a race condition) so we do
// not change them here.
DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED);
DCHECK(queue_node.admitted_schedule.get() != nullptr);
*schedule_result = std::move(queue_node.admitted_schedule);
DCHECK(!queue->Contains(&queue_node));
VLOG_QUERY << "Admitted queued query id=" << PrintId(request.query_id);
VLOG_RPC << "Final: " << pool_stats->DebugString();
return Status::OK();
}
}
void AdmissionController::ReleaseQuery(
const QuerySchedule& schedule, int64_t peak_mem_consumption) {
const string& pool_name = schedule.request_pool();
{
lock_guard<mutex> lock(admission_ctrl_lock_);
DCHECK_EQ(num_released_backends_.at(schedule.query_id()), 0);
num_released_backends_.erase(num_released_backends_.find(schedule.query_id()));
PoolStats* stats = GetPoolStats(schedule);
stats->ReleaseQuery(peak_mem_consumption);
// No need to update the Host Stats as they should have been updated in
// ReleaseQueryBackends.
pools_for_updates_.insert(pool_name);
UpdateExecGroupMetric(schedule.executor_group(), -1);
VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
<< stats->DebugString();
pending_dequeue_ = true;
}
dequeue_cv_.NotifyOne();
}
void AdmissionController::ReleaseQueryBackends(
const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addrs) {
{
lock_guard<mutex> lock(admission_ctrl_lock_);
UpdateStatsOnReleaseForBackends(schedule, host_addrs);
// Update num_released_backends_.
auto released_backends = num_released_backends_.find(schedule.query_id());
if (released_backends != num_released_backends_.end()) {
released_backends->second -= host_addrs.size();
} else {
string err_msg = Substitute("Unable to find num released backends for query $0",
PrintId(schedule.query_id()));
DCHECK(false) << err_msg;
LOG(ERROR) << err_msg;
}
if (VLOG_IS_ON(2)) {
stringstream ss;
ss << "Released query backend(s) ";
for (auto host_addr : host_addrs) ss << PrintThrift(host_addr) << " ";
ss << "for query id=" << PrintId(schedule.query_id()) << " "
<< GetPoolStats(schedule)->DebugString();
VLOG(2) << ss.str();
}
pending_dequeue_ = true;
}
dequeue_cv_.NotifyOne();
}
Status AdmissionController::ResolvePoolAndGetConfig(
const TQueryCtx& query_ctx, string* pool_name, TPoolConfig* pool_config) {
RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(query_ctx, pool_name));
DCHECK_EQ(query_ctx.request_pool, *pool_name);
return request_pool_service_->GetPoolConfig(*pool_name, pool_config);
}
// Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC.
void AdmissionController::UpdatePoolStats(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
vector<TTopicDelta>* subscriber_topic_updates) {
{
lock_guard<mutex> lock(admission_ctrl_lock_);
AddPoolUpdates(subscriber_topic_updates);
StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC);
if (topic != incoming_topic_deltas.end()) {
const TTopicDelta& delta = topic->second;
// Delta and non-delta updates are handled the same way, except for a full update
// we first clear the backend TPoolStats. We then update the global map
// and then re-compute the pool stats for any pools that changed.
if (!delta.is_delta) {
VLOG_ROW << "Full impala-request-queue stats update";
for (auto& entry : pool_stats_) entry.second.ClearRemoteStats();
}
HandleTopicUpdates(delta.topic_entries);
}
UpdateClusterAggregates();
last_topic_update_time_ms_ = MonotonicMillis();
pending_dequeue_ = true;
}
dequeue_cv_.NotifyOne(); // Dequeue and admit queries on the dequeue thread
}
void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id,
TPoolStats* host_stats) {
DCHECK_NE(host_id, parent_->host_id_); // Shouldn't be updating for local host.
RemoteStatsMap::iterator it = remote_stats_.find(host_id);
if (VLOG_ROW_IS_ON) {
stringstream ss;
ss << "Stats update for pool=" << name_ << " backend=" << host_id;
if (host_stats == nullptr) ss << " topic deletion";
if (it != remote_stats_.end()) ss << " previous: " << DebugPoolStats(it->second);
if (host_stats != nullptr) ss << " new: " << DebugPoolStats(*host_stats);
VLOG_ROW << ss.str();
}
if (host_stats == nullptr) {
if (it != remote_stats_.end()) {
remote_stats_.erase(it);
} else {
VLOG_QUERY << "Attempted to remove non-existent remote stats for host=" << host_id;
}
} else {
remote_stats_[host_id] = *host_stats;
}
}
void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_updates) {
for (const TTopicItem& item: topic_updates) {
string pool_name;
string topic_backend_id;
if (!ParsePoolTopicKey(item.key, &pool_name, &topic_backend_id)) continue;
// The topic entry from this subscriber is handled specially; the stats coming
// from the statestore are likely already outdated.
if (topic_backend_id == host_id_) continue;
if (item.deleted) {
GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
continue;
}
TPoolStats remote_update;
uint32_t len = item.value.size();
Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
item.value.data()), &len, false, &remote_update);
if (!status.ok()) {
VLOG_QUERY << "Error deserializing pool update with key: " << item.key;
continue;
}
GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, &remote_update);
}
}
void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reserved) {
const string& coord_id = parent_->host_id_;
int64_t num_running = 0;
int64_t num_queued = 0;
int64_t mem_reserved = 0;
for (const PoolStats::RemoteStatsMap::value_type& remote_entry : remote_stats_) {
const string& host = remote_entry.first;
// Skip an update from this subscriber as the information may be outdated.
// The stats from this coordinator will be added below.
if (host == coord_id) continue;
const TPoolStats& remote_pool_stats = remote_entry.second;
DCHECK_GE(remote_pool_stats.num_admitted_running, 0);
DCHECK_GE(remote_pool_stats.num_queued, 0);
DCHECK_GE(remote_pool_stats.backend_mem_reserved, 0);
num_running += remote_pool_stats.num_admitted_running;
num_queued += remote_pool_stats.num_queued;
// Update the per-pool and per-host aggregates with the mem reserved by this host in
// this pool.
mem_reserved += remote_pool_stats.backend_mem_reserved;
(*host_mem_reserved)[host] += remote_pool_stats.backend_mem_reserved;
// TODO(IMPALA-8762): For multiple coordinators, need to track the number of running
// queries per executor, i.e. every admission controller needs to send the full map to
// everyone else.
}
num_running += local_stats_.num_admitted_running;
num_queued += local_stats_.num_queued;
mem_reserved += local_stats_.backend_mem_reserved;
(*host_mem_reserved)[coord_id] += local_stats_.backend_mem_reserved;
DCHECK_GE(num_running, 0);
DCHECK_GE(num_queued, 0);
DCHECK_GE(mem_reserved, 0);
DCHECK_GE(num_running, local_stats_.num_admitted_running);
DCHECK_GE(num_queued, local_stats_.num_queued);
if (agg_num_running_ == num_running && agg_num_queued_ == num_queued
&& agg_mem_reserved_ == mem_reserved) {
DCHECK_EQ(num_running, metrics_.agg_num_running->GetValue());
DCHECK_EQ(num_queued, metrics_.agg_num_queued->GetValue());
DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->GetValue());
return;
}
VLOG_ROW << "Recomputed agg stats, previous: " << DebugString();
agg_num_running_ = num_running;
agg_num_queued_ = num_queued;
agg_mem_reserved_ = mem_reserved;
metrics_.agg_num_running->SetValue(num_running);
metrics_.agg_num_queued->SetValue(num_queued);
metrics_.agg_mem_reserved->SetValue(mem_reserved);
VLOG_ROW << "Updated: " << DebugString();
}
void AdmissionController::UpdateClusterAggregates() {
// Recompute mem_reserved for all hosts.
PoolStats::HostMemMap updated_mem_reserved;
for (auto& entry : pool_stats_) entry.second.UpdateAggregates(&updated_mem_reserved);
stringstream ss;
ss << "Updated mem reserved for hosts:";
int i = 0;
for (const auto& e : updated_mem_reserved) {
int64_t old_mem_reserved = host_stats_[e.first].mem_reserved;
if (old_mem_reserved == e.second) continue;
host_stats_[e.first].mem_reserved = e.second;
if (VLOG_ROW_IS_ON) {
ss << endl << e.first << ": " << PrintBytes(old_mem_reserved);
ss << " -> " << PrintBytes(e.second);
++i;
}
}
if (i > 0) VLOG_ROW << ss.str();
}
Status AdmissionController::ComputeGroupSchedules(
ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node) {
int64_t previous_membership_version = 0;
if (queue_node->membership_snapshot.get() != nullptr) {
previous_membership_version = queue_node->membership_snapshot->version;
}
int64_t current_membership_version = membership_snapshot->version;
DCHECK_GT(current_membership_version, 0);
DCHECK_GE(current_membership_version, previous_membership_version);
if (current_membership_version <= previous_membership_version) {
VLOG(3) << "No rescheduling necessary, previous membership version: "
<< previous_membership_version
<< ", current membership version: " << current_membership_version;
return Status::OK();
}
const AdmissionRequest& request = queue_node->admission_request;
VLOG(3) << "Scheduling query " << PrintId(request.query_id)
<< " with membership version " << current_membership_version;
queue_node->membership_snapshot = membership_snapshot;
std::vector<GroupSchedule>* output_schedules = &queue_node->group_schedules;
output_schedules->clear();
const string& pool_name = request.request.query_ctx.request_pool;
// If the first statestore update arrives before the local backend has finished starting
// up, we might not have a local backend descriptor yet. We return no schedules, which
// will result in the query being queued.
if (membership_snapshot->local_be_desc == nullptr) {
queue_node->not_admitted_reason = REASON_LOCAL_BACKEND_NOT_STARTED;
LOG(WARNING) << queue_node->not_admitted_reason;
return Status::OK();
}
const TBackendDescriptor& local_be_desc = *membership_snapshot->local_be_desc;
vector<const ExecutorGroup*> executor_groups;
GetExecutorGroupsForPool(
membership_snapshot->executor_groups, pool_name, &executor_groups);
if (executor_groups.empty()) {
queue_node->not_admitted_reason = REASON_NO_EXECUTOR_GROUPS;
LOG(WARNING) << queue_node->not_admitted_reason;
return Status::OK();
}
// We loop over the executor groups in a deterministic order. This means we will fill up
// each executor group before considering an unused one. In particular, we will not try
// to balance queries across executor groups equally.
// TODO(IMPALA-8731): balance queries across executor groups more evenly
for (const ExecutorGroup* executor_group : executor_groups) {
DCHECK(executor_group->IsHealthy());
DCHECK_GT(executor_group->NumExecutors(), 0);
unique_ptr<QuerySchedule> group_schedule =
make_unique<QuerySchedule>(request.query_id, request.request,
request.query_options, request.summary_profile, request.query_events);
const string& group_name = executor_group->name();
VLOG(3) << "Scheduling for executor group: " << group_name << " with "
<< executor_group->NumExecutors() << " executors";
const Scheduler::ExecutorConfig group_config = {*executor_group, local_be_desc};
RETURN_IF_ERROR(ExecEnv::GetInstance()->scheduler()->Schedule(
group_config, group_schedule.get()));
DCHECK(!group_schedule->executor_group().empty());
output_schedules->emplace_back(std::move(group_schedule), *executor_group);
}
DCHECK(!output_schedules->empty());
return Status::OK();
}
bool AdmissionController::FindGroupToAdmitOrReject(int64_t cluster_size,
ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config,
bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node) {
// Check for rejection based on current cluster size
const string& pool_name = pool_stats->name();
string rejection_reason;
if (RejectForCluster(
pool_name, pool_config, admit_from_queue, cluster_size, &rejection_reason)) {
DCHECK(!rejection_reason.empty());
queue_node->not_admitted_reason = rejection_reason;
return false;
}
// Compute schedules
Status ret = ComputeGroupSchedules(membership_snapshot, queue_node);
if (!ret.ok()) {
DCHECK(queue_node->not_admitted_reason.empty());
queue_node->not_admitted_reason = Substitute(REASON_SCHEDULER_ERROR, ret.GetDetail());
return false;
}
if (queue_node->group_schedules.empty()) {
DCHECK(!queue_node->not_admitted_reason.empty());
return true;
}
for (GroupSchedule& group_schedule : queue_node->group_schedules) {
const ExecutorGroup& executor_group = group_schedule.executor_group;
DCHECK_GT(executor_group.NumExecutors(), 0);
QuerySchedule* schedule = group_schedule.schedule.get();
schedule->UpdateMemoryRequirements(pool_config);
const string& group_name = executor_group.name();
int64_t group_size = executor_group.NumExecutors();
VLOG(3) << "Trying to admit query to pool " << pool_name << " in executor group "
<< group_name << " (" << group_size << " executors)";
const int64_t max_queued = GetMaxQueuedForPool(pool_config, cluster_size);
const int64_t max_mem = GetMaxMemForPool(pool_config, cluster_size);
const int64_t max_requests = GetMaxRequestsForPool(pool_config, cluster_size);
VLOG_QUERY << "Trying to admit id=" << PrintId(schedule->query_id())
<< " in pool_name=" << pool_name << " executor_group_name=" << group_name
<< " per_host_mem_estimate="
<< PrintBytes(schedule->GetPerExecutorMemoryEstimate())
<< " dedicated_coord_mem_estimate="
<< PrintBytes(schedule->GetDedicatedCoordMemoryEstimate())
<< " max_requests=" << max_requests << " ("
<< GetMaxRequestsForPoolDescription(pool_config, cluster_size) << ")"
<< " max_queued=" << max_queued << " ("
<< GetMaxQueuedForPoolDescription(pool_config, cluster_size) << ")"
<< " max_mem=" << PrintBytes(max_mem) << " ("
<< GetMaxMemForPoolDescription(pool_config, cluster_size) << ")";
VLOG_QUERY << "Stats: " << pool_stats->DebugString();
// Query is rejected if the rejection check fails on *any* group.
if (RejectForSchedule(
*schedule, pool_config, cluster_size, group_size, &rejection_reason)) {
DCHECK(!rejection_reason.empty());
queue_node->not_admitted_reason = rejection_reason;
return false;
}
if (CanAdmitRequest(*schedule, pool_config, cluster_size, admit_from_queue,
&queue_node->not_admitted_reason)) {
queue_node->admitted_schedule = std::move(group_schedule.schedule);
return true;
} else {
VLOG_RPC << "Cannot admit query " << queue_node->admission_request.query_id
<< " to group " << group_name << ": " << queue_node->not_admitted_reason;
}
}
return true;
}
void AdmissionController::PoolStats::UpdateMemTrackerStats() {
// May be NULL if no queries have ever executed in this pool on this node but another
// node sent stats for this pool.
MemTracker* tracker =
ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(name_, false);
const int64_t current_reserved =
tracker == nullptr ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved();
if (current_reserved != local_stats_.backend_mem_reserved) {
parent_->pools_for_updates_.insert(name_);
local_stats_.backend_mem_reserved = current_reserved;
metrics_.local_backend_mem_reserved->SetValue(current_reserved);
}
const int64_t current_usage =
tracker == nullptr ? static_cast<int64_t>(0) : tracker->consumption();
metrics_.local_backend_mem_usage->SetValue(current_usage);
}
void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
// local_stats_ are updated eagerly except for backend_mem_reserved (which isn't used
// for local admission control decisions). Update that now before sending local_stats_.
for (auto& entry : pool_stats_) {
entry.second.UpdateMemTrackerStats();
}
if (pools_for_updates_.empty()) return;
topic_updates->push_back(TTopicDelta());
TTopicDelta& topic_delta = topic_updates->back();
topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
for (const string& pool_name: pools_for_updates_) {
DCHECK(pool_stats_.find(pool_name) != pool_stats_.end());
PoolStats* stats = GetPoolStats(pool_name);
VLOG_ROW << "Sending topic update " << stats->DebugString();
topic_delta.topic_entries.push_back(TTopicItem());
TTopicItem& topic_item = topic_delta.topic_entries.back();
topic_item.key = MakePoolTopicKey(pool_name, host_id_);
Status status = thrift_serializer_.SerializeToString(&stats->local_stats(),
&topic_item.value);
if (!status.ok()) {
LOG(WARNING) << "Failed to serialize query pool stats: " << status.GetDetail();
topic_updates->pop_back();
}
}
pools_for_updates_.clear();
}
void AdmissionController::DequeueLoop() {
unique_lock<mutex> lock(admission_ctrl_lock_);
while (true) {
if (done_) break;
while (!pending_dequeue_) {
dequeue_cv_.Wait(lock);
}
pending_dequeue_ = false;
ClusterMembershipMgr::SnapshotPtr membership_snapshot =
cluster_membership_mgr_->GetSnapshot();
// If a query was queued while the cluster is still starting up but the client facing
// services have already started to accept connections, the whole membership can still
// be empty.
if (membership_snapshot->executor_groups.empty()) continue;
const int64_t cluster_size = GetClusterSize(*membership_snapshot);
for (const PoolConfigMap::value_type& entry: pool_config_map_) {
const string& pool_name = entry.first;
const TPoolConfig& pool_config = entry.second;
PoolStats* stats = GetPoolStats(pool_name, /* dcheck_exists=*/true);
stats->UpdateDerivedMetrics(pool_config, cluster_size);
if (stats->local_stats().num_queued == 0) continue; // Nothing to dequeue
DCHECK_GE(stats->agg_num_queued(), stats->local_stats().num_queued);
RequestQueue& queue = request_queue_map_[pool_name];
int64_t max_to_dequeue = GetMaxToDequeue(queue, stats, pool_config, cluster_size);
VLOG_RPC << "Dequeue thread will try to admit " << max_to_dequeue << " requests"
<< ", pool=" << pool_name
<< ", num_queued=" << stats->local_stats().num_queued
<< " cluster_size=" << cluster_size;
if (max_to_dequeue == 0) continue; // to next pool.
while (max_to_dequeue > 0 && !queue.empty()) {
QueueNode* queue_node = queue.head();
DCHECK(queue_node != nullptr);
// Find a group that can admit the query
bool is_cancelled = queue_node->admit_outcome->IsSet()
&& queue_node->admit_outcome->Get() == AdmissionOutcome::CANCELLED;
bool is_rejected = !is_cancelled
&& !FindGroupToAdmitOrReject(cluster_size, membership_snapshot, pool_config,
/* admit_from_queue=*/true, stats, queue_node);
if (!is_cancelled && !is_rejected
&& queue_node->admitted_schedule.get() == nullptr) {
// If no group was found, stop trying to dequeue.
// TODO(IMPALA-2968): Requests further in the queue may be blocked
// unnecessarily. Consider a better policy once we have better test scenarios.
LogDequeueFailed(queue_node, queue_node->not_admitted_reason);
break;
}
// At this point we know that the query must be taken off the queue
queue.Dequeue();
--max_to_dequeue;
VLOG(3) << "Dequeueing from stats for pool " << pool_name;
stats->Dequeue(false);
if (is_rejected) {
AdmissionOutcome outcome =
queue_node->admit_outcome->Set(AdmissionOutcome::REJECTED);
if (outcome == AdmissionOutcome::REJECTED) {
stats->metrics()->total_rejected->Increment(1);
continue; // next query
} else {
DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
is_cancelled = true;
}
}
DCHECK(is_cancelled || queue_node->admitted_schedule != nullptr);
const TUniqueId& query_id = queue_node->admission_request.query_id;
if (!is_cancelled) {
VLOG_QUERY << "Admitting from queue: query=" << PrintId(query_id);
AdmissionOutcome outcome =
queue_node->admit_outcome->Set(AdmissionOutcome::ADMITTED);
if (outcome != AdmissionOutcome::ADMITTED) {
DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
is_cancelled = true;
}
}
if (is_cancelled) {
VLOG_QUERY << "Dequeued cancelled query=" << PrintId(query_id);
continue; // next query
}
DCHECK(queue_node->admit_outcome->IsSet());
DCHECK_ENUM_EQ(queue_node->admit_outcome->Get(), AdmissionOutcome::ADMITTED);
DCHECK(!is_cancelled);
DCHECK(!is_rejected);
DCHECK(queue_node->admitted_schedule != nullptr);
AdmitQuery(queue_node->admitted_schedule.get(), true);
}
pools_for_updates_.insert(pool_name);
}
}
}
int64_t AdmissionController::GetQueueTimeoutForPoolMs(const TPoolConfig& pool_config) {
int64_t queue_wait_timeout_ms = pool_config.__isset.queue_timeout_ms ?
pool_config.queue_timeout_ms :
FLAGS_queue_wait_timeout_ms;
return max<int64_t>(0, queue_wait_timeout_ms);
}
int64_t AdmissionController::GetMaxToDequeue(RequestQueue& queue, PoolStats* stats,
const TPoolConfig& pool_config, int64_t cluster_size) {
if (PoolLimitsRunningQueriesCount(pool_config)) {
const int64_t max_requests = GetMaxRequestsForPool(pool_config, cluster_size);
const int64_t total_available = max_requests - stats->agg_num_running();
if (total_available <= 0) {
// There is a limit for the number of running queries, so we can
// see that nothing can run in this pool.
// This can happen in the case of over-admission.
if (!queue.empty()) {
LogDequeueFailed(queue.head(),
Substitute(QUEUED_NUM_RUNNING, stats->agg_num_running(), max_requests,
GetMaxRequestsForPoolDescription(pool_config, cluster_size),
GetStalenessDetailLocked(" ")));
}
return 0;
}
// Use the ratio of locally queued requests to agg queued so that each impalad
// can dequeue a proportional amount total_available. Note, this offers no
// fairness between impalads.
double queue_size_ratio = static_cast<double>(stats->local_stats().num_queued)
/ static_cast<double>(max<int64_t>(1, stats->agg_num_queued()));
DCHECK(queue_size_ratio <= 1.0);
return min(stats->local_stats().num_queued,
max<int64_t>(1, queue_size_ratio * total_available));
} else {
return stats->local_stats().num_queued; // No limit on num running requests
}
}
void AdmissionController::LogDequeueFailed(QueueNode* node,
const string& not_admitted_reason) {
VLOG_QUERY << "Could not dequeue query id=" << PrintId(node->admission_request.query_id)
<< " reason: " << not_admitted_reason;
node->admission_request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_LAST_QUEUED_REASON, not_admitted_reason);
}
AdmissionController::PoolStats* AdmissionController::GetPoolStats(
const QuerySchedule& schedule) {
DCHECK(!schedule.request_pool().empty());
return GetPoolStats(schedule.request_pool());
}
AdmissionController::PoolStats* AdmissionController::GetPoolStats(
const string& pool_name, bool dcheck_exists) {
DCHECK(!pool_name.empty());
auto it = pool_stats_.find(pool_name);
DCHECK(!dcheck_exists || it != pool_stats_.end());
if (it == pool_stats_.end()) {
bool inserted;
std::tie(it, inserted) = pool_stats_.emplace(pool_name, PoolStats(this, pool_name));
DCHECK(inserted);
}
DCHECK(it != pool_stats_.end());
return &it->second;
}
void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
VLOG_RPC << "For Query " << PrintId(schedule->query_id())
<< " per_backend_mem_limit set to: "
<< PrintBytes(schedule->per_backend_mem_limit())
<< " per_backend_mem_to_admit set to: "
<< PrintBytes(schedule->per_backend_mem_to_admit())
<< " coord_backend_mem_limit set to: "
<< PrintBytes(schedule->coord_backend_mem_limit())
<< " coord_backend_mem_to_admit set to: "
<< PrintBytes(schedule->coord_backend_mem_to_admit());;
// Update memory and number of queries.
UpdateStatsOnAdmission(*schedule);
UpdateExecGroupMetric(schedule->executor_group(), 1);
// Update summary profile.
const string& admission_result =
was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
schedule->summary_profile()->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, admission_result);
schedule->summary_profile()->AddInfoString(
PROFILE_INFO_KEY_ADMITTED_MEM, PrintBytes(schedule->GetClusterMemoryToAdmit()));
schedule->summary_profile()->AddInfoString(
PROFILE_INFO_KEY_EXECUTOR_GROUP, schedule->executor_group());
// We may have admitted based on stale information. Include a warning in the profile
// if this this may be the case.
int64_t time_since_update_ms;
string staleness_detail = GetStalenessDetailLocked("", &time_since_update_ms);
// IMPALA-8235: convert to TIME_NS because of issues with tools consuming TIME_MS.
COUNTER_SET(ADD_COUNTER(schedule->summary_profile(),
PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME, TUnit::TIME_NS),
static_cast<int64_t>(time_since_update_ms * NANOS_PER_MICRO * MICROS_PER_MILLI));
if (!staleness_detail.empty()) {
schedule->summary_profile()->AddInfoString(
PROFILE_INFO_KEY_STALENESS_WARNING, staleness_detail);
}
DCHECK(
num_released_backends_.find(schedule->query_id()) == num_released_backends_.end());
num_released_backends_[schedule->query_id()] =
schedule->per_backend_exec_params().size();
}
string AdmissionController::GetStalenessDetail(const string& prefix,
int64_t* ms_since_last_update) {
lock_guard<mutex> lock(admission_ctrl_lock_);
return GetStalenessDetailLocked(prefix, ms_since_last_update);
}
string AdmissionController::GetStalenessDetailLocked(const string& prefix,
int64_t* ms_since_last_update) {
int64_t ms_since_update = MonotonicMillis() - last_topic_update_time_ms_;
if (ms_since_last_update != nullptr) *ms_since_last_update = ms_since_update;
if (last_topic_update_time_ms_ == 0) {
return Substitute("$0Warning: admission control information from statestore "
"is stale: no update has been received.", prefix);
} else if (ms_since_update >= FLAGS_admission_control_stale_topic_threshold_ms) {
return Substitute("$0Warning: admission control information from statestore "
"is stale: $1 since last update was received.",
prefix, PrettyPrinter::Print(ms_since_update, TUnit::TIME_MS));
}
return "";
}
void AdmissionController::PoolToJsonLocked(const string& pool_name,
rapidjson::Value* resource_pools, rapidjson::Document* document) {
auto it = pool_stats_.find(pool_name);
if (it == pool_stats_.end()) return;
PoolStats* stats = &it->second;
RequestQueue& queue = request_queue_map_[pool_name];
// Get the pool stats
using namespace rapidjson;
Value pool_info_json(kObjectType);
stats->ToJson(&pool_info_json, document);
// Get the queued queries
Value queued_queries(kArrayType);
queue.Iterate([&queued_queries, document](QueueNode* node) {
if (node->group_schedules.empty()) {
Value query_info(kObjectType);
query_info.AddMember("query_id", "N/A", document->GetAllocator());
query_info.AddMember("mem_limit", 0, document->GetAllocator());
query_info.AddMember("mem_limit_to_admit", 0, document->GetAllocator());
query_info.AddMember("num_backends", 0, document->GetAllocator());
queued_queries.PushBack(query_info, document->GetAllocator());
return true;
}
QuerySchedule* schedule = node->group_schedules.begin()->schedule.get();
Value query_info(kObjectType);
Value query_id(PrintId(schedule->query_id()).c_str(), document->GetAllocator());
query_info.AddMember("query_id", query_id, document->GetAllocator());
query_info.AddMember(
"mem_limit", schedule->per_backend_mem_limit(), document->GetAllocator());
query_info.AddMember("mem_limit_to_admit", schedule->per_backend_mem_to_admit(),
document->GetAllocator());
query_info.AddMember("coord_mem_limit", schedule->coord_backend_mem_limit(),
document->GetAllocator());
query_info.AddMember("coord_mem_to_admit",
schedule->coord_backend_mem_to_admit(), document->GetAllocator());
query_info.AddMember("num_backends", schedule->per_backend_exec_params().size(),
document->GetAllocator());
queued_queries.PushBack(query_info, document->GetAllocator());
return true;
});
pool_info_json.AddMember("queued_queries", queued_queries, document->GetAllocator());
// Get the queued reason for the query at the head of the queue.
if (!queue.empty()) {
Value head_queued_reason(
queue.head()
->profile->GetInfoString(PROFILE_INFO_KEY_LAST_QUEUED_REASON)
->c_str(),
document->GetAllocator());
pool_info_json.AddMember(
"head_queued_reason", head_queued_reason, document->GetAllocator());
}
resource_pools->PushBack(pool_info_json, document->GetAllocator());
}
void AdmissionController::PoolToJson(const string& pool_name,
rapidjson::Value* resource_pools, rapidjson::Document* document) {
lock_guard<mutex> lock(admission_ctrl_lock_);
PoolToJsonLocked(pool_name, resource_pools, document);
}
void AdmissionController::AllPoolsToJson(
rapidjson::Value* resource_pools, rapidjson::Document* document) {
lock_guard<mutex> lock(admission_ctrl_lock_);
for (const PoolConfigMap::value_type& entry : pool_config_map_) {
const string& pool_name = entry.first;
PoolToJsonLocked(pool_name, resource_pools, document);
}
}
void AdmissionController::PoolStats::UpdateWaitTime(int64_t wait_time_ms) {
metrics()->time_in_queue_ms->Increment(wait_time_ms);
if (wait_time_ms_ema_ == 0) {
wait_time_ms_ema_ = wait_time_ms;
return;
}
wait_time_ms_ema_ =
wait_time_ms_ema_ * (1 - EMA_MULTIPLIER) + wait_time_ms * EMA_MULTIPLIER;
}
void AdmissionController::PoolStats::ToJson(
rapidjson::Value* pool, rapidjson::Document* document) const {
using namespace rapidjson;
Value pool_name(name_.c_str(), document->GetAllocator());
pool->AddMember("pool_name", pool_name, document->GetAllocator());
pool->AddMember(
"agg_num_running", metrics_.agg_num_running->GetValue(), document->GetAllocator());
pool->AddMember(
"agg_num_queued", metrics_.agg_num_queued->GetValue(), document->GetAllocator());
pool->AddMember("agg_mem_reserved", metrics_.agg_mem_reserved->GetValue(),
document->GetAllocator());
pool->AddMember("local_mem_admitted", metrics_.local_mem_admitted->GetValue(),
document->GetAllocator());
pool->AddMember(
"total_admitted", metrics_.total_admitted->GetValue(), document->GetAllocator());
pool->AddMember(
"total_rejected", metrics_.total_rejected->GetValue(), document->GetAllocator());
pool->AddMember(
"total_timed_out", metrics_.total_timed_out->GetValue(), document->GetAllocator());
pool->AddMember("pool_max_mem_resources", metrics_.pool_max_mem_resources->GetValue(),
document->GetAllocator());
pool->AddMember("pool_max_requests", metrics_.pool_max_requests->GetValue(),
document->GetAllocator());
pool->AddMember(
"pool_max_queued", metrics_.pool_max_queued->GetValue(), document->GetAllocator());
pool->AddMember("pool_queue_timeout", metrics_.pool_queue_timeout->GetValue(),
document->GetAllocator());
pool->AddMember("max_query_mem_limit", metrics_.max_query_mem_limit->GetValue(),
document->GetAllocator());
pool->AddMember("min_query_mem_limit", metrics_.min_query_mem_limit->GetValue(),
document->GetAllocator());
pool->AddMember("clamp_mem_limit_query_option",
metrics_.clamp_mem_limit_query_option->GetValue(), document->GetAllocator());
pool->AddMember("max_running_queries_multiple",
metrics_.max_running_queries_multiple->GetValue(), document->GetAllocator());
pool->AddMember("max_queued_queries_multiple",
metrics_.max_queued_queries_multiple->GetValue(), document->GetAllocator());
pool->AddMember("max_memory_multiple", metrics_.max_memory_multiple->GetValue(),
document->GetAllocator());
pool->AddMember("max_running_queries_derived",
metrics_.max_running_queries_derived->GetValue(), document->GetAllocator());
pool->AddMember("max_queued_queries_derived",
metrics_.max_queued_queries_derived->GetValue(), document->GetAllocator());
pool->AddMember("max_memory_derived", metrics_.max_memory_derived->GetValue(),
document->GetAllocator());
pool->AddMember("wait_time_ms_ema", wait_time_ms_ema_, document->GetAllocator());
Value histogram(kArrayType);
for (int bucket = 0; bucket < peak_mem_histogram_.size(); bucket++) {
Value histogram_elem(kArrayType);
histogram_elem.PushBack(bucket, document->GetAllocator());
histogram_elem.PushBack(peak_mem_histogram_[bucket], document->GetAllocator());
histogram.PushBack(histogram_elem, document->GetAllocator());
}
pool->AddMember("peak_mem_usage_histogram", histogram, document->GetAllocator());
}
void AdmissionController::ResetPoolInformationalStats(const string& pool_name) {
lock_guard<mutex> lock(admission_ctrl_lock_);
auto it = pool_stats_.find(pool_name);
if(it == pool_stats_.end()) return;
it->second.ResetInformationalStats();
}
void AdmissionController::ResetAllPoolInformationalStats() {
lock_guard<mutex> lock(admission_ctrl_lock_);
for (auto& it: pool_stats_) it.second.ResetInformationalStats();
}
void AdmissionController::PoolStats::ResetInformationalStats() {
std::fill(peak_mem_histogram_.begin(), peak_mem_histogram_.end(), 0);
wait_time_ms_ema_ = 0.0;
// Reset only metrics keeping track of totals since last reset.
metrics()->total_admitted->SetValue(0);
metrics()->total_rejected->SetValue(0);
metrics()->total_queued->SetValue(0);
metrics()->total_dequeued->SetValue(0);
metrics()->total_timed_out->SetValue(0);
metrics()->total_released->SetValue(0);
metrics()->time_in_queue_ms->SetValue(0);
}
void AdmissionController::PoolStats::InitMetrics() {
metrics_.total_admitted = parent_->metrics_group_->AddCounter(
TOTAL_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
metrics_.total_queued = parent_->metrics_group_->AddCounter(
TOTAL_QUEUED_METRIC_KEY_FORMAT, 0, name_);
metrics_.total_dequeued = parent_->metrics_group_->AddCounter(
TOTAL_DEQUEUED_METRIC_KEY_FORMAT, 0, name_);
metrics_.total_rejected = parent_->metrics_group_->AddCounter(
TOTAL_REJECTED_METRIC_KEY_FORMAT, 0, name_);
metrics_.total_timed_out = parent_->metrics_group_->AddCounter(
TOTAL_TIMED_OUT_METRIC_KEY_FORMAT, 0, name_);
metrics_.total_released = parent_->metrics_group_->AddCounter(
TOTAL_RELEASED_METRIC_KEY_FORMAT, 0, name_);
metrics_.time_in_queue_ms = parent_->metrics_group_->AddCounter(
TIME_IN_QUEUE_METRIC_KEY_FORMAT, 0, name_);
metrics_.agg_num_running = parent_->metrics_group_->AddGauge(
AGG_NUM_RUNNING_METRIC_KEY_FORMAT, 0, name_);
metrics_.agg_num_queued = parent_->metrics_group_->AddGauge(
AGG_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_);
metrics_.agg_mem_reserved = parent_->metrics_group_->AddGauge(
AGG_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_);
metrics_.local_mem_admitted = parent_->metrics_group_->AddGauge(
LOCAL_MEM_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
metrics_.local_num_admitted_running = parent_->metrics_group_->AddGauge(
LOCAL_NUM_ADMITTED_RUNNING_METRIC_KEY_FORMAT, 0, name_);
metrics_.local_num_queued = parent_->metrics_group_->AddGauge(
LOCAL_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_);
metrics_.local_backend_mem_usage = parent_->metrics_group_->AddGauge(
LOCAL_BACKEND_MEM_USAGE_METRIC_KEY_FORMAT, 0, name_);
metrics_.local_backend_mem_reserved = parent_->metrics_group_->AddGauge(
LOCAL_BACKEND_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_);
metrics_.pool_max_mem_resources = parent_->metrics_group_->AddGauge(
POOL_MAX_MEM_RESOURCES_METRIC_KEY_FORMAT, 0, name_);
metrics_.pool_max_requests = parent_->metrics_group_->AddGauge(
POOL_MAX_REQUESTS_METRIC_KEY_FORMAT, 0, name_);
metrics_.pool_max_queued = parent_->metrics_group_->AddGauge(
POOL_MAX_QUEUED_METRIC_KEY_FORMAT, 0, name_);
metrics_.pool_queue_timeout = parent_->metrics_group_->AddGauge(
POOL_QUEUE_TIMEOUT_METRIC_KEY_FORMAT, 0, name_);
metrics_.max_query_mem_limit = parent_->metrics_group_->AddGauge(
POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_);
metrics_.min_query_mem_limit = parent_->metrics_group_->AddGauge(
POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_);
metrics_.clamp_mem_limit_query_option = parent_->metrics_group_->AddProperty<bool>(
POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT, false, name_);
metrics_.max_running_queries_multiple = parent_->metrics_group_->AddDoubleGauge(
POOL_MAX_RUNNING_QUERIES_MULTIPLE_METRIC_KEY_FORMAT, 0, name_);
metrics_.max_queued_queries_multiple = parent_->metrics_group_->AddDoubleGauge(
POOL_MAX_QUEUED_QUERIES_MULTIPLE_METRIC_KEY_FORMAT, 0, name_);
metrics_.max_memory_multiple = parent_->metrics_group_->AddGauge(
POOL_MAX_MEMORY_MULTIPLE_METRIC_KEY_FORMAT, 0, name_);
metrics_.max_running_queries_derived = parent_->metrics_group_->AddGauge(
POOL_MAX_RUNNING_QUERIES_DERIVED_METRIC_KEY_FORMAT, 0, name_);
metrics_.max_queued_queries_derived = parent_->metrics_group_->AddGauge(
POOL_MAX_QUEUED_QUERIES_DERIVED_METRIC_KEY_FORMAT, 0, name_);
metrics_.max_memory_derived = parent_->metrics_group_->AddGauge(
POOL_MAX_MEMORY_DERIVED_METRIC_KEY_FORMAT, 0, name_);
}
void AdmissionController::PopulatePerHostMemReservedAndAdmitted(
PerHostStats* per_host_stats) {
lock_guard<mutex> l(admission_ctrl_lock_);
*per_host_stats = host_stats_;
}
string AdmissionController::MakePoolTopicKey(
const string& pool_name, const string& backend_id) {
// Ensure the backend_id does not contain the delimiter to ensure that the topic key
// can be parsed properly by finding the last instance of the delimiter.
DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
}
void AdmissionController::GetExecutorGroupsForPool(
const ClusterMembershipMgr::ExecutorGroups& all_groups, const string& pool_name,
vector<const ExecutorGroup*>* matching_groups) {
string prefix(pool_name + POOL_GROUP_DELIMITER);
// We search for matching groups before the health check so that we don't fall back to
// the default group in case there are matching but unhealthy groups.
for (const auto& it : all_groups) {
StringPiece name(it.first);
if (name.starts_with(prefix)) matching_groups->push_back(&it.second);
}
if (matching_groups->empty()) {
auto default_it = all_groups.find(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
if (default_it == all_groups.end()) return;
VLOG(3) << "Checking default executor group for pool " << pool_name;
matching_groups->push_back(&default_it->second);
}
// Filter out unhealthy groups.
auto erase_from = std::remove_if(matching_groups->begin(), matching_groups->end(),
[](const ExecutorGroup* g) { return !g->IsHealthy(); });
matching_groups->erase(erase_from, matching_groups->end());
// Sort executor groups by name.
auto cmp = [](const ExecutorGroup* a, const ExecutorGroup* b) {
return a->name() < b->name();
};
sort(matching_groups->begin(), matching_groups->end(), cmp);
}
int64_t AdmissionController::GetClusterSize(
const ClusterMembershipMgr::Snapshot& membership_snapshot) {
int64_t sum = 0;
for (const auto& it : membership_snapshot.executor_groups) {
sum += it.second.NumExecutors();
}
return sum;
}
int64_t AdmissionController::GetExecutorGroupSize(
const ClusterMembershipMgr::Snapshot& membership_snapshot,
const string& group_name) {
auto it = membership_snapshot.executor_groups.find(group_name);
DCHECK(it != membership_snapshot.executor_groups.end())
<< "Could not find group " << group_name;
if (it == membership_snapshot.executor_groups.end()) return 0;
return it->second.NumExecutors();
}
int64_t AdmissionController::GetMaxMemForPool(
const TPoolConfig& pool_config, int64_t cluster_size) {
if (pool_config.max_memory_multiple > 0) {
return pool_config.max_memory_multiple * cluster_size;
}
return pool_config.max_mem_resources;
}
string AdmissionController::GetMaxMemForPoolDescription(
const TPoolConfig& pool_config, int64_t cluster_size) {
if (pool_config.max_memory_multiple > 0) {
return Substitute("calculated as $0 backends each with $1", cluster_size,
PrintBytes(pool_config.max_memory_multiple));
}
return "configured statically";
}
int64_t AdmissionController::GetMaxRequestsForPool(
const TPoolConfig& pool_config, int64_t cluster_size) {
if (pool_config.max_running_queries_multiple > 0) {
return ceil(pool_config.max_running_queries_multiple * cluster_size);
}
return pool_config.max_requests;
}
string AdmissionController::GetMaxRequestsForPoolDescription(
const TPoolConfig& pool_config, int64_t cluster_size) {
if (pool_config.max_running_queries_multiple > 0) {
return Substitute("calculated as $0 backends each with $1 queries", cluster_size,
pool_config.max_running_queries_multiple);
}
return "configured statically";
}
int64_t AdmissionController::GetMaxQueuedForPool(
const TPoolConfig& pool_config, int64_t cluster_size) {
if (pool_config.max_queued_queries_multiple > 0) {
return ceil(pool_config.max_queued_queries_multiple * cluster_size);
}
return pool_config.max_queued;
}
string AdmissionController::GetMaxQueuedForPoolDescription(
const TPoolConfig& pool_config, int64_t cluster_size) {
if (pool_config.max_queued_queries_multiple > 0) {
return Substitute("calculated as $0 backends each with $1 queries", cluster_size,
pool_config.max_queued_queries_multiple);
}
return "configured statically";
}
bool AdmissionController::PoolDisabled(const TPoolConfig& pool_config) {
return ((pool_config.max_requests == 0 && pool_config.max_running_queries_multiple == 0)
|| (pool_config.max_mem_resources == 0 && pool_config.max_memory_multiple == 0));
}
bool AdmissionController::PoolLimitsRunningQueriesCount(const TPoolConfig& pool_config) {
return pool_config.max_requests > 0 || pool_config.max_running_queries_multiple > 0;
}
bool AdmissionController::PoolHasFixedMemoryLimit(const TPoolConfig& pool_config) {
return pool_config.max_mem_resources > 0 && pool_config.max_memory_multiple <= 0;
}
int64_t AdmissionController::GetMemToAdmit(
const QuerySchedule& schedule, const BackendExecParams& backend_exec_params) {
return backend_exec_params.is_coord_backend ? schedule.coord_backend_mem_to_admit() :
schedule.per_backend_mem_to_admit();
}
void AdmissionController::UpdateExecGroupMetricMap(
ClusterMembershipMgr::SnapshotPtr snapshot) {
std::unordered_set<string> grp_names;
for (const auto& group : snapshot->executor_groups) {
if (group.second.NumHosts() > 0) grp_names.insert(group.first);
}
lock_guard<mutex> l(admission_ctrl_lock_);
auto it = exec_group_query_load_map_.begin();
while (it != exec_group_query_load_map_.end()) {
// Erase existing groups from the set so that only new ones are left.
if (grp_names.erase(it->first) == 0) {
// Existing group not in the set means it no longer exists.
string group_name = it->first;
it = exec_group_query_load_map_.erase(it);
metrics_group_->RemoveMetric(EXEC_GROUP_QUERY_LOAD_KEY_FORMAT, group_name);
} else {
++it;
}
}
// Now only the new groups are remaining in the set, add a metric for them.
for (const string& new_grp : grp_names) {
// There might be lingering queries from when this group was active.
int64_t currently_running = 0;
auto new_grp_it = snapshot->executor_groups.find(new_grp);
DCHECK(new_grp_it != snapshot->executor_groups.end());
ExecutorGroup group = new_grp_it->second;
for (const TBackendDescriptor& be_desc : group.GetAllExecutorDescriptors()) {
const string& host = TNetworkAddressToString(be_desc.address);
auto stats = host_stats_.find(host);
if (stats != host_stats_.end()) {
currently_running = std::max(currently_running, stats->second.num_admitted);
}
}
exec_group_query_load_map_[new_grp] = metrics_group_->AddGauge(
EXEC_GROUP_QUERY_LOAD_KEY_FORMAT, currently_running, new_grp);
}
}
void AdmissionController::UpdateExecGroupMetric(
const string& grp_name, int64_t delta) {
auto entry = exec_group_query_load_map_.find(grp_name);
if (entry != exec_group_query_load_map_.end()) entry->second->Increment(delta);
}
} // namespace impala