blob: 3b93d3f007050208af550deef2d0de4ada822ba9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <list>
#include <string>
#include <utility>
#include <vector>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
#include <gtest/gtest_prod.h>
#include "common/status.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/request-pool-service.h"
#include "scheduling/query-schedule.h"
#include "statestore/statestore-subscriber.h"
#include "util/condition-variable.h"
#include "util/internal-queue.h"
#include "util/runtime-profile.h"
#include "util/thread.h"
namespace impala {
class ExecEnv;
/// Represents the admission outcome of a query. It is stored in the 'admit_outcome'
/// input variable passed to AdmissionController::AdmitQuery() if an admission decision
/// has been made or the caller has initiated a cancellation.
enum class AdmissionOutcome {
/// The AdmissionController is used to throttle requests (e.g. queries, DML) based
/// on available cluster resources, which are configured in one or more resource pools. A
/// request will either be admitted for immediate execution, queued for later execution,
/// or rejected (either immediately or after being queued). Resource pools can be
/// configured to have maximum number of concurrent queries, maximum cluster wide memory,
/// maximum queue size, max and min per host memory limit for every query, and to set
/// whether the mem_limit query option will be clamped by the previously mentioned max/min
/// per host limits or not. Queries will be queued if there are already too many queries
/// executing or there isn't enough available memory. Once the queue reaches the maximum
/// queue size, incoming queries will be rejected. Requests in the queue will time out
/// after a configurable timeout.
/// Depending on the -is_coordinator startup flag, multiple impalads can act as a
/// coordinator and thus also an admission controller, so some cluster state must be
/// shared between impalads in order to make admission decisions on any of them. Every
/// coordinator maintains some per-pool and per-host statistics related to the requests it
/// itself is servicing as the admission controller. Some of these local admission
/// statistics in addition to some backend-specific statistics (i.e. the backend executor
/// associated with the same impalad process) are disseminated across the cluster via the
/// statestore using the IMPALA_REQUEST_QUEUE_TOPIC topic. Effectively, coordinators send
/// statestore updates where the admission statistics reflect the load and all
/// participating backends send statestore updates reflecting the load they're executing.
/// Every <impalad, pool> pair is sent as a topic update at the statestore heartbeat
/// interval when pool statistics change, and the topic updates from other impalads are
/// used to re-compute the aggregate per-pool stats. Because the pool statistics are only
/// updated on statestore heartbeats and all decisions are made with the cached state,
/// the aggregate pool statistics are only estimates. As a result, more requests may be
/// admitted or queued than the configured thresholds, which are really soft limits.
/// Memory resources:
/// A pool may be configured to allow a maximum amount of memory resources to be
/// 'reserved' by requests admitted to that pool. While Impala does not yet truly
/// 'reserve' the memory at admission (i.e. Impala does not yet guarantee the memory for
/// a request, it is still possible to overadmit such that multiple queries think they
/// have reserved the same memory), the admission controller uses several metrics to
/// estimate the available memory and admit only when it thinks the necessary memory is
/// available. Future work will enable real reservations, but this is a much larger
/// effort and will involve changes outside of the admission controller.
/// The memory required for admission for a request is specified as the query option
/// MEM_LIMIT (either explicitly or via a default value). This is a per-host value. If
/// there is no memory limit, the per-host estimate from planning is used instead as a
/// memory limit and a lower bound is enforced on it based on the largest initial
/// reservation of the query. The final memory limit used is also clamped by the max/min
/// memory limits configured for the pool with an option to not enforce these limits on
/// the MEM_LIMIT query option (If both these max/min limits are not configured, then the
/// estimates from planning are not used as a memory limit and are only used for making
/// admission decisions. Moreover the estimates will no longer have a lower bound based on
/// the largest initial reservation).
/// The following four conditions must hold in order for the request to be admitted:
/// 1) The current pool configuration is valid.
/// 2) There must be enough memory resources available in this resource pool for the
/// request. The max memory resources configured for the resource pool specifies the
/// aggregate, cluster-wide memory that may be reserved by all executing queries in
/// this pool. Thus the aggregate memory to be reserved across all participating
/// backends for this request, *plus* that of already admitted requests must be less
/// than or equal to the max resources specified.
/// 3) All participating backends must have enough memory available. Each impalad has a
/// per-process mem limit, and that is the max memory that can be reserved on that
/// backend.
/// 3b) (optional) When using executor groups (see below) and admitting to the
/// non-default executor group, then the number of currently running queries must be
/// below the configured maximum for all participating backends.
/// 4) The final per host memory limit used can accommodate the largest initial
/// reservation.
/// In order to admit based on these conditions, the admission controller accounts for
/// the following on both a per-host and per-pool basis:
/// a) Mem Reserved: the amount of memory that has been reported as reserved by all
/// backends, which come from the statestore topic updates. The values that are sent
/// come from the pool mem trackers in UpdateMemTrackerStats(), which reflects the
/// memory reserved by fragments that have begun execution. For queries that are
/// executing and have mem limits, the limit is considered to be its reserved memory
/// because it may consume up to that limit. Otherwise the query's current consumption
/// is used (see MemTracker::GetPoolMemReserved()). The per-pool and per-host
/// aggregates are computed in UpdateClusterAggregates(). This state, once all updates
/// are fully distributed and aggregated, provides enough information to make
/// admission decisions by any impalad. However, this requires waiting for both
/// admitted requests to start all remote fragments and then for the updated state to
/// be distributed via the statestore.
/// b) Mem Admitted: the amount of memory required (i.e. the value used in admission,
/// either the mem limit or estimate) for the requests that this impalad's admission
/// controller has admitted. Both the per-pool and per-host accounting is updated
/// when requests are admitted and released (and NOTE: not via the statestore, so
/// there is no latency, but this does not account for memory from requests admitted
/// by other impalads).
/// c) Num Admitted: the number of queries that have been admitted and are therefore
/// considered to be currently running. Note that there is currently no equivalent to
/// the reserved memory reporting, i.e. hosts do not report the actual number of
/// queries that are currently executing (IMPALA-8762). This prevents using multiple
/// coordinators with executor groups.
/// As described, both the 'reserved' and 'admitted' mem accounting mechanisms have
/// different advantages and disadvantages. The 'reserved' mem accounting works well in
/// the steady state, i.e. given enough time to distribute updates. The 'admitted'
/// mem accounting works perfectly when there is a single coordinator (and perhaps works
/// reasonably with just a few). The maximum of the reserved and admitted mem is used in
/// making admission decisions, which works well when either relatively few coordinators
/// are used or, if there is a wide distribution of requests across impalads, the rate of
/// submission is low enough that new state is able to be updated by the statestore.
/// Releasing Queries:
/// When queries complete they must be explicitly released from the admission controller
/// using the methods 'ReleaseQuery' and 'ReleaseQueryBackends'. These methods release
/// the admitted memory and decrement the number of admitted queries for the resource
/// pool. All Backends for a query must be released via 'ReleaseQueryBackends' before the
/// query is released using 'ReleaseQuery'. Releasing Backends releases the admitted
/// memory used by that Backend and decrements the number of running queries on the host
/// running that Backend. Releasing a query does not release any admitted memory, it only
/// decrements the number of running queries in the resource pool.
/// Executor Groups:
/// Executors in a cluster can be assigned to executor groups. Each executor can only be
/// in one group. A resource pool can have multiple executor groups associated with it.
/// Each executor group belongs to a single resource pool and will only serve requests
/// from that pool. I.e. the relationships are 1 resource pool : many executor groups and
/// 1 executor group : many executors.
/// Executors that don't specify an executor group name during startup are automatically
/// added to a default group called DEFAULT_EXECUTOR_GROUP_NAME. The default executor
/// group does not enforce query concurrency limits per host and as such can be admitted
/// to by multiple coordinators.
/// Executor groups are mapped to resource pools implicitly by their name. Queries in a
/// resource pool can run on all executor groups whose name starts with the pool's name,
/// separated by a '-'. For example, queries in a pool with name 'q1' can run on all
/// executor groups starting with 'q1-'. If no matching executor groups can be found for a
/// resource pool and the default executor group is not empty, then the default group is
/// used.
/// In addition to the checks described before, admission to executor groups is bounded by
/// the maximum number of queries that can run concurrently on an executor
/// (-admission_control_slots). An additional check is performed to ensure that each
/// executor in the group has an available slot to run the query. Admission controllers
/// include the number of queries that have been admitted to each executor in the
/// statestore updates.
/// In order to find an executor group that can run a query, the admission controller
/// calls FindGroupToAdmitOrReject(), either during the initial admission attempt or in
/// DequeueLoop(). If the cluster membership has changed, it (re-)computes schedules for
/// all executor groups and then tries to admit queries using the list of schedules.
/// Admission is always attempted in the same order so that executor groups fill up before
/// further ones are considered. In particular, we don't attempt to balance the queries
/// across executor groups.
/// Example without executor groups:
/// Consider a 10-node cluster with 100gb/node and a resource pool 'q1' configured with
/// 500gb of aggregate memory and 40gb as the max memory limit. An incoming request with
/// the MEM_LIMIT query option set to 50gb and scheduled to execute on all backends is
/// received by SubmitForAdmission() on an otherwise quiet cluster. Based on the pool
/// configuration, a per host mem limit of 40gb is used for this query and for any
/// subsequent checks that it needs to pass prior to admission. FindGroupToAdmitOrReject()
/// computes a schedule for the default executor group and performs rejection tests before
/// calling CanAdmitRequest(), which checks the number of running queries and then calls
/// HasAvailableMemResources() to check for memory resources. It first checks whether
/// there is enough memory for the request using PoolStats::EffectiveMemReserved() (which
/// is the max of the pool's agg_mem_reserved_ and local_mem_admitted_, see #1 above),
/// then checks for enough memory on each individual host via the max of mem_reserved and
/// mem_admitted in hosts_stats_ (see #2 above) and finally checks if the memory limit
/// used for this query can accommodate its largest initial reservation. In this case,
/// ample resources are available so CanAdmitRequest() returns true. PoolStats::Admit() is
/// called to update q1's PoolStats: it first updates agg_num_running_ and
/// local_mem_admitted_ which are available to be used immediately for incoming admission
/// requests, then it updates num_admitted_running in the struct sent to the statestore
/// (local_stats_). UpdateHostStats() is called to update the per-host admitted mem
/// (stored in the map host_stats_) for all participating hosts. Then SubmitForAdmission()
/// returns to the ClientRequestState. If another identical admission request is received
/// by the same coordinator immediately, it will be rejected because q1's
/// local_mem_admitted_ is already 400gb. If that request were sent to another impalad at
/// the same time, it would have been admitted because not all updates have been
/// disseminated yet. The next statestore update will contain the updated value of
/// num_admitted_running for q1 on this backend. As remote fragments begin execution on
/// remote impalads, their pool mem trackers will reflect the updated amount of memory
/// reserved (set in local_stats_.backend_mem_reserved by UpdateMemTrackerStats()) and the
/// next statestore updates coming from those impalads will contain the updated value. As
/// the statestore updates are received (in the subscriber callback fn UpdatePoolStats()),
/// the incoming per-backend, per-pool mem_reserved values are aggregated to
/// PoolStats::agg_mem_reserved_ (pool aggregate over all hosts) and backend_mem_reserved_
/// (per-host aggregates over all pools). Once this has happened, any incoming admission
/// request now has the updated state required to make correct admission decisions.
/// Example with executor groups:
/// Consider a cluster with a dedicated coordinator and 2 executor groups
/// "default-pool-group-1" and "default-pool-group-2" (the number of executors per group
/// does not matter for this example). Both executor groups will be able to serve requests
/// from the default resource pool. Consider that each executor has only one admission
/// slot i.e. --admission_control_slots=1 is specified for all executors. An incoming
/// query with mt_dop=1 is submitted through SubmitForAdmission(), which calls
/// FindGroupToAdmitOrReject(). From there we call ComputeGroupSchedules() which calls
/// compute schedules for both executor groups. Then we perform rejection tests and
/// afterwards call CanAdmitRequest() for each of the schedules. Executor groups are
/// processed in alphanumerically sorted order, so we attempt admission to group
/// "default-pool-group-1" first. CanAdmitRequest() calls HasAvailableSlots() to check
/// whether any of the hosts in the group can fit the new query in their available slots
/// and since it does fit, admission succeeds. The query is admitted and 'slots_in_use'
/// is incremented for each host in that group based on the effective parallelism of the
/// query. When a second query arrives while the first one is still running, we perform
/// the same steps. In particular we compute schedules for both groups and consider
/// admission to default-pool-group-1 first. However, the check in HasAvailableSlots()
/// now fails and we will consider group default-pool-group-2 next. For this group,
/// the check succeeds and the query is admitted, incrementing the num_admitted counter
/// for each host in group default-pool-group-2.
/// Queuing Behavior:
/// Once the resources in a pool are consumed, each coordinator receiving requests will
/// begin queuing. While each individual queue is FIFO, there is no total ordering on the
/// queued requests between admission controllers and no FIFO behavior is guaranteed for
/// requests submitted to different coordinators. When resources become available, there
/// is no synchronous coordination between nodes used to determine which get to dequeue
/// and admit requests. Instead, we use a simple heuristic to try to dequeue a number of
/// requests proportional to the number of requests that are waiting in each individual
/// admission controller to the total number of requests queued across all admission
/// controllers (i.e. impalads). This limits the amount of overadmission that may result
/// from a large amount of resources becoming available at the same time. When there are
/// requests queued in multiple pools on the same host, the admission controller simply
/// iterates over the pools in pool_stats_ and attempts to dequeue from each. This is fine
/// for the max_requests limit, but is unfair for memory-based admission because the
/// iteration order of pools effectively gives priority to the queues at the beginning.
/// Requests across queues may be competing for the same resources on particular hosts,
/// i.e. #2 in the description of memory-based admission above. Note the pool's
/// max_mem_resources (#1) is not contented.
/// Cancellation Behavior:
/// An admission request<schedule, admit_outcome> submitted using SubmitForAdmission() can
/// be proactively cancelled by setting the 'admit_outcome' to
/// AdmissionOutcome::CANCELLED. This is handled asynchronously by SubmitForAdmission()
/// and DequeueLoop().
/// Pool Configuration Mechanism:
/// The path to pool config files are specified using the startup flags
/// "fair_scheduler_allocation_path" and "llama_site_path". The format for specifying pool
/// configs is based on yarn and llama with additions specific to Impala. A file
/// monitoring service is started that monitors changes made to these files. Those changes
/// are only propagated to Impala when a new query is serviced. See RequestPoolService
/// class for more details.
class AdmissionController {
// Profile info strings
static const std::string PROFILE_INFO_KEY_ADMISSION_RESULT;
static const std::string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
static const std::string PROFILE_INFO_VAL_QUEUED;
static const std::string PROFILE_INFO_VAL_CANCELLED_IN_QUEUE;
static const std::string PROFILE_INFO_VAL_ADMIT_QUEUED;
static const std::string PROFILE_INFO_VAL_REJECTED;
static const std::string PROFILE_INFO_VAL_TIME_OUT;
static const std::string PROFILE_INFO_KEY_LAST_QUEUED_REASON;
static const std::string PROFILE_INFO_KEY_ADMITTED_MEM;
static const std::string PROFILE_INFO_KEY_EXECUTOR_GROUP;
static const std::string PROFILE_INFO_KEY_STALENESS_WARNING;
AdmissionController(ClusterMembershipMgr* cluster_membership_mgr,
StatestoreSubscriber* subscriber, RequestPoolService* request_pool_service,
MetricGroup* metrics, const TNetworkAddress& host_addr);
/// This struct contains all information needed to create a QuerySchedule and try to
/// admit it. None of the members are owned by the instances of this class (usually they
/// are owned by the ClientRequestState).
struct AdmissionRequest {
const TUniqueId& query_id;
const TQueryExecRequest& request;
const TQueryOptions& query_options;
RuntimeProfile* summary_profile;
RuntimeProfile::EventSequence* query_events;
/// Submits the request for admission. May returns immediately if rejected, but
/// otherwise blocks until the request is either admitted, times out, gets rejected
/// later, or cancelled by the client (by setting 'admit_outcome' to CANCELLED). When
/// this method returns, the following <admit_outcome, Return Status> pairs are
/// possible:
/// - Admitted: <ADMITTED, Status::OK>
/// - Rejected or timed out: <REJECTED or TIMED_OUT, Status(msg: reason for the same)>
/// - Cancelled: <CANCELLED, Status::CANCELLED>
/// If admitted, ReleaseQuery() should also be called after the query completes or gets
/// cancelled to ensure that the pool statistics are updated.
Status SubmitForAdmission(const AdmissionRequest& request,
Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
std::unique_ptr<QuerySchedule>* schedule_result);
/// Updates the pool statistics when a query completes (either successfully,
/// is cancelled or failed). This should be called for all requests that have
/// been submitted via AdmitQuery(). 'schedule' is the QuerySchedule of the completed
/// query and 'peak_mem_consumption' is the peak memory consumption of the query.
/// This does not block.
void ReleaseQuery(const QuerySchedule& schedule, int64_t peak_mem_consumption);
/// Updates the pool statistics when a Backend running a query completes (either
/// successfully, is cancelled or failed). This should be called for all Backends part
/// of a query for all queries that have been submitted via AdmitQuery().
/// 'schedule' is the QuerySchedule of the associated query and the vector of
/// TNetworkAddresses identify the completed Backends.
/// This does not block.
void ReleaseQueryBackends(
const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addr);
/// Registers the request queue topic with the statestore, starts up the dequeue thread
/// and registers a callback with the cluster membership manager to receive updates for
/// membership changes.
Status Init();
/// Serializes relevant stats, configurations and information associated with queued
/// queries for the resource pool identified by 'pool_name' to JSON by adding members to
/// 'resource_pools'. Is a no-op if a pool with name 'pool_name' does not exist or no
/// queries have been submitted to that pool yet.
void PoolToJson(const std::string& pool_name, rapidjson::Value* resource_pools,
rapidjson::Document* document);
/// Serializes relevant stats, configurations and information associated with queued
/// queries for every resource pool (to which queries have been submitted at least once)
/// to JSON by adding members to 'resource_pools'.
void AllPoolsToJson(rapidjson::Value* resource_pools, rapidjson::Document* document);
/// Calls ResetInformationalStats on the pool identified by 'pool_name'.
void ResetPoolInformationalStats(const std::string& pool_name);
/// Calls ResetInformationalStats on all pools.
void ResetAllPoolInformationalStats();
// This struct stores per-host statistics which are used during admission and by HTTP
// handlers to query admission control statistics for currently registered backends.
struct HostStats {
/// The mem reserved for a query that is currently executing is its memory limit, if
/// set (which should be the common case with admission control). Otherwise, if the
/// query has no limit or the query is finished executing, the current consumption
/// (tracked by its query mem tracker) is used.
int64_t mem_reserved = 0;
/// The per host mem admitted only for the queries admitted locally.
int64_t mem_admitted = 0;
/// The per host number of queries admitted only for the queries admitted locally.
int64_t num_admitted = 0;
/// The per host number of slots in use for the queries admitted locally.
int64_t slots_in_use = 0;
typedef std::unordered_map<std::string, HostStats> PerHostStats;
// Populates the input map with the per host memory reserved and admitted in the
// following format: <host_address_str, pair<mem_reserved, mem_admitted>>.
// Only used for populating the 'backends' debug page.
void PopulatePerHostMemReservedAndAdmitted(PerHostStats* host_stats);
/// Returns a non-empty string with a warning if the admission control data is stale.
/// 'prefix' is added to the start of the string. Returns an empty string if not stale.
/// If 'ms_since_last_update' is non-null, set it to the time in ms since last update.
/// Caller must not hold 'admission_ctrl_lock_'.
std::string GetStalenessDetail(const std::string& prefix,
int64_t* ms_since_last_update = nullptr);
class PoolStats;
friend class PoolStats;
/// Pointer to the cluster membership manager. Not owned by the AdmissionController.
ClusterMembershipMgr* cluster_membership_mgr_;
/// Subscription manager used to handle admission control updates. This is not
/// owned by this class.
StatestoreSubscriber* subscriber_;
/// Used for user-to-pool resolution and looking up pool configurations. Not owned by
/// the AdmissionController.
RequestPoolService* request_pool_service_;
/// Metrics subsystem access
MetricGroup* metrics_group_;
/// Maps names of executor groups to their respective query load metric.
std::unordered_map<std::string, IntGauge*> exec_group_query_load_map_;
/// Thread dequeuing and admitting queries.
std::unique_ptr<Thread> dequeue_thread_;
// The local impalad's host/port id, used to construct topic keys.
const std::string host_id_;
/// Serializes/deserializes TPoolStats when sending and receiving topic updates.
ThriftSerializer thrift_serializer_;
/// Protects all access to all variables below.
boost::mutex admission_ctrl_lock_;
/// The last time a topic update was processed. Time is obtained from
/// MonotonicMillis(), or is 0 if an update was never received.
int64_t last_topic_update_time_ms_ = 0;
PerHostStats host_stats_;
/// Contains all per-pool statistics and metrics. Accessed via GetPoolStats().
class PoolStats {
struct PoolMetrics {
/// Monotonically increasing counters (since process start) referring to this
/// host's admission controller.
IntCounter* total_admitted;
IntCounter* total_rejected;
IntCounter* total_queued;
IntCounter* total_dequeued; // Does not include those in total_timed_out
IntCounter* total_timed_out;
IntCounter* total_released;
IntCounter* time_in_queue_ms;
/// The following mirror the current values in PoolStats.
/// TODO: Avoid duplication: replace the int64_t fields on PoolStats with these.
IntGauge* agg_num_running;
IntGauge* agg_num_queued;
IntGauge* agg_mem_reserved;
IntGauge* local_mem_admitted;
/// The following mirror the current values of local_stats_.
/// TODO: As above, consolidate the metrics and local_stats_.
IntGauge* local_num_admitted_running;
IntGauge* local_num_queued;
IntGauge* local_backend_mem_reserved;
IntGauge* local_backend_mem_usage;
/// Metrics exposing the pool settings.
IntGauge* pool_max_mem_resources;
IntGauge* pool_max_requests;
IntGauge* pool_max_queued;
IntGauge* pool_queue_timeout;
IntGauge* max_query_mem_limit;
IntGauge* min_query_mem_limit;
BooleanProperty* clamp_mem_limit_query_option;
DoubleGauge* max_running_queries_multiple;
DoubleGauge* max_queued_queries_multiple;
IntGauge* max_memory_multiple;
/// Metrics exposing the pool's derived runtime configuration.
IntGauge* max_running_queries_derived;
IntGauge* max_queued_queries_derived;
IntGauge* max_memory_derived;
PoolStats(AdmissionController* parent, const std::string& name)
: name_(name), parent_(parent), agg_num_running_(0), agg_num_queued_(0),
agg_mem_reserved_(0), local_mem_admitted_(0), wait_time_ms_ema_(0.0) {
peak_mem_histogram_.resize(HISTOGRAM_NUM_OF_BINS, 0);
int64_t agg_num_running() const { return agg_num_running_; }
int64_t agg_num_queued() const { return agg_num_queued_; }
int64_t EffectiveMemReserved() const {
return std::max(agg_mem_reserved_, local_mem_admitted_);
/// Updates the pool stats when the request represented by 'schedule' is admitted.
void AdmitQueryAndMemory(const QuerySchedule& schedule);
/// Updates the pool stats except the memory admitted stat.
void ReleaseQuery(int64_t peak_mem_consumption);
/// Releases the specified memory from the pool stats.
void ReleaseMem(int64_t mem_to_release);
/// Updates the pool stats when the request represented by 'schedule' is queued.
void Queue();
/// Updates the pool stats when the request represented by 'schedule' is dequeued.
void Dequeue(bool timed_out);
/// Updates the local_stats_.backend_mem_reserved with the pool mem tracker. Called
/// before sending local_stats().
void UpdateMemTrackerStats();
/// Called on a full topic update to clear all stats before processing the update.
void ClearRemoteStats() { remote_stats_.clear(); }
/// Called to update remote host TPoolStats with the new host_stats for the
/// specified host. If host_stats is NULL the stats for the specified remote host
/// are removed (i.e. topic deletion).
void UpdateRemoteStats(const std::string& backend_id, TPoolStats* host_stats);
/// Maps from host id to memory reserved and memory admitted, both aggregates over all
/// pools. See the class doc for a detailed definition of reserved and admitted.
/// Protected by admission_ctrl_lock_.
typedef boost::unordered_map<std::string, int64_t> HostMemMap;
/// Called after updating local_stats_ and remote_stats_ to update the aggregate
/// values of agg_num_running_, agg_num_queued_, and agg_mem_reserved_. The in/out
/// parameter host_mem_reserved is a map from host id to memory reserved used to
/// aggregate the mem reserved values across all pools for each host. Used by
/// UpdateClusterAggregates() to update host_mem_reserved_; it provides the host
/// aggregates when called over all pools.
void UpdateAggregates(HostMemMap* host_mem_reserved);
const TPoolStats& local_stats() { return local_stats_; }
/// Updates the metrics exposing the pool configuration to those in pool_cfg.
void UpdateConfigMetrics(const TPoolConfig& pool_cfg, int64_t cluster_size);
/// Updates the metrics exposing the scalable pool configuration values.
void UpdateDerivedMetrics(const TPoolConfig& pool_cfg, int64_t cluster_size);
PoolMetrics* metrics() { return &metrics_; }
std::string DebugString() const;
/// Updates the metric keeping track of total time in queue and the exponential
/// moving average of query wait time for all queries submitted to this pool.
void UpdateWaitTime(int64_t wait_time_ms);
/// Serializes relevant stats and configurations to JSON by adding members to 'pool'.
void ToJson(rapidjson::Value* pool, rapidjson::Document* document) const;
/// Resets the informational stats like those keeping track of absolute
/// values(totals), the peak query memory histogram, and the exponential moving
/// average of wait time.
void ResetInformationalStats();
const std::string& name() const { return name_; }
const std::string name_;
AdmissionController* parent_;
/// Aggregate (across all hosts) number of running queries in this pool. Updated
/// by Admit(), Release(), and after processing statestore updates by
/// UpdateAggregates().
int64_t agg_num_running_;
/// Aggregate (across all hosts) number of queued requests. Updated by Queue(),
/// Dequeue(), and after processing statestore updates by UpdateAggregates().
int64_t agg_num_queued_;
/// Aggregate memory reported as reserved for fragments executing in this pool by
/// every host, i.e. the sum of all local_stats_.mem_reserved from all
/// other hosts. Updated only by UpdateAggregates().
int64_t agg_mem_reserved_;
/// Memory in this pool (across all nodes) that is needed for requests that have been
/// admitted by this local coordinator. Updated only on Admit() and Release(). Stored
/// separately from the other 'local' stats in local_stats_ because it is not sent
/// to the statestore (no 'aggregated' value is needed).
int64_t local_mem_admitted_;
/// This pool's TPoolStats for this host. Sent to the statestore (and thus not stored
/// in remote_stats_ with the remote hosts). Most fields are updated eagerly and used
/// for local admission decisions. local_stats_.backend_mem_reserved is the
/// exception: it is not used in local admission decisions so it can be updated
/// lazily before sending a statestore update.
TPoolStats local_stats_;
/// Map of host_ids to the latest TPoolStats. Entirely generated by incoming
/// statestore updates; updated by UpdateRemoteStats() and used by UpdateAggregates().
typedef boost::unordered_map<std::string, TPoolStats> RemoteStatsMap;
RemoteStatsMap remote_stats_;
/// Per-pool metrics, created by InitMetrics().
PoolMetrics metrics_;
/// A histogram of the peak memory used by a query among all hosts. Its a vector of
/// size 'HISTOGRAM_NUM_OF_BINS' and every i-th element represents the number of
/// queries that had recorded a peak memory between (i, i+1] * HISTOGRAM_BIN_SIZE
/// Bytes, except for the last one that represents a memory range of
std::vector<int64_t> peak_mem_histogram_;
static const int64_t HISTOGRAM_NUM_OF_BINS;
static const int64_t HISTOGRAM_BIN_SIZE;
/// Keeps track of exponential moving average of all queries submitted to this pool
/// that were not rejected. A weighting multiplier of value 'EMA_MULTIPLIER' is used.
double wait_time_ms_ema_;
static const double EMA_MULTIPLIER;
void InitMetrics();
FRIEND_TEST(AdmissionControllerTest, Simple);
FRIEND_TEST(AdmissionControllerTest, PoolStats);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
FRIEND_TEST(AdmissionControllerTest, QueryRejection);
friend class AdmissionControllerTest;
/// Map of pool names to pool stats. Accessed via GetPoolStats().
/// Protected by admission_ctrl_lock_.
typedef boost::unordered_map<std::string, PoolStats> PoolStatsMap;
PoolStatsMap pool_stats_;
/// This struct groups together a schedule and the executor group that it was scheduled
/// on. It is used to attempt admission without rescheduling the query in case the
/// cluster membership has not changed. Users of the struct must make sure that
/// executor_group stays valid.
struct GroupSchedule {
std::unique_ptr<QuerySchedule> schedule, const ExecutorGroup& executor_group)
: schedule(std::move(schedule)), executor_group(executor_group) {}
std::unique_ptr<QuerySchedule> schedule;
const ExecutorGroup& executor_group;
/// The set of pools that have changed between topic updates that need stats to be sent
/// to the statestore. The key is the pool name.
typedef boost::unordered_set<std::string> PoolSet;
PoolSet pools_for_updates_;
/// Structure stored in the RequestQueue representing an admission request. This struct
/// lives only during the call to AdmitQuery() but its members live past that and are
/// owned by the ClientRequestState object associated with them.
/// Objects of this class progress linearly through the following states.
/// - Initialized: The request has been created
/// - Admitting: The request has been attempted to be admitted at least once and
/// additional intermediate state has been stored in some members
/// - Admitted: The request was admitted, cancelled, or rejected and 'admit_outcome' is
/// set. If it was admitted, 'admitted_schedule' is also not nullptr.
struct QueueNode : public InternalQueue<QueueNode>::Node {
QueueNode(AdmissionRequest request,
Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admission_outcome,
RuntimeProfile* profile)
: admission_request(std::move(request)),
admit_outcome(admission_outcome) {}
/// BEGIN: Members that are valid for new objects after initialization
/// The admission request contains everything required to build schedules.
const AdmissionRequest admission_request;
/// Profile to be updated with information about admission.
RuntimeProfile* profile;
/// END: Members that are valid for new objects after initialization
/// BEGIN: Members that are only valid while queued, but invalid once dequeued.
/// The membership snapshot used during the last admission attempt. It can be nullptr
/// before the first admission attempt and if any schedules have been created,
/// 'group_schedule' will contain the corresponding schedules and executor groups.
ClusterMembershipMgr::SnapshotPtr membership_snapshot;
/// List of schedules and executor groups that can be attempted to be admitted for
/// this queue node.
std::vector<GroupSchedule> group_schedules;
/// END: Members that are only valid while queued, but invalid once dequeued.
/// BEGIN: Members that are valid after admission / cancellation / rejection
/// The last reason why this request could not be admitted.
std::string not_admitted_reason;
/// The Admission outcome of the queued request.
Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* const admit_outcome;
/// The schedule of the query if it was admitted successfully. Nullptr if it has not
/// been admitted or was cancelled or rejected.
std::unique_ptr<QuerySchedule> admitted_schedule = nullptr;
/// END: Members that are valid after admission / cancellation / rejection
/// Queue for the queries waiting to be admitted for execution. Once the
/// maximum number of concurrently executing queries has been reached,
/// incoming queries are queued and admitted first come, first served.
typedef InternalQueue<QueueNode> RequestQueue;
/// Map of pool names to request queues.
typedef boost::unordered_map<std::string, RequestQueue> RequestQueueMap;
RequestQueueMap request_queue_map_;
/// Map of pool names to the pool configs returned by request_pool_service_. Stored so
/// that the dequeue thread does not need to access the configs via the request pool
/// service again (which involves a JNI call and error checking).
typedef boost::unordered_map<std::string, TPoolConfig> PoolConfigMap;
PoolConfigMap pool_config_map_;
/// Indicates whether a change in pool stats warrants an attempt by the dequeuing
/// thread to dequeue.
bool pending_dequeue_ = true;
/// Notifies the dequeuing thread that pool stats have changed and it may be
/// possible to dequeue and admit queries.
ConditionVariable dequeue_cv_;
/// If true, tear down the dequeuing thread. This only happens in unit tests.
bool done_;
/// Tracks the number of released Backends for each active query. Used purely for
/// internal state validation. Used to ensure that all Backends are released before
/// the query is released.
typedef boost::unordered_map<TUniqueId, int> NumReleasedBackends;
NumReleasedBackends num_released_backends_;
/// Resolves the resource pool name in 'query_ctx.request_pool' and stores the resulting
/// name in 'pool_name' and the resulting config in 'pool_config'.
Status ResolvePoolAndGetConfig(const TQueryCtx& query_ctx, std::string* pool_name,
TPoolConfig* pool_config);
/// Statestore subscriber callback that sends outgoing topic deltas (see
/// AddPoolUpdates()) and processes incoming topic deltas, updating the PoolStats
/// state.
void UpdatePoolStats(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
std::vector<TTopicDelta>* subscriber_topic_updates);
/// Adds outgoing topic updates to subscriber_topic_updates for pools that have changed
/// since the last call to AddPoolUpdates(). Called by UpdatePoolStats() before
/// UpdateClusterAggregates(). Must hold admission_ctrl_lock_.
void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
/// Updates the remote stats with per-host topic_updates coming from the statestore.
/// Removes remote stats identified by topic deletions coming from the
/// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
/// Re-computes the per-pool aggregate stats and the per-host aggregates in host_stats_
/// using each pool's remote_stats_ and local_stats_.
/// Called by UpdatePoolStats() after handling updates and deletions.
/// Must hold admission_ctrl_lock_.
void UpdateClusterAggregates();
/// Computes schedules for all executor groups that can run the query in 'queue_node'.
/// For subsequent calls schedules are only re-computed if the membership version inside
/// 'membership_snapshot' has changed. Will return any errors that occur during
/// scheduling, e.g. if the scan range generation fails. Note that this will not return
/// an error if no executor groups are available for scheduling, but will set
/// 'queue_node->not_admitted_reason' and leave 'queue_node->group_schedules' empty in
/// that case.
Status ComputeGroupSchedules(
ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node);
/// Reschedules the query if necessary using 'membership_snapshot' and tries to find an
/// executor group that the query can be admitted to. If the query is unable to run on
/// any of the groups irrespective of their current workload, it is rejected. Returns
/// true and sets queue_node->admitted_schedule if the query can be admitted. Returns
/// true and keeps queue_node->admitted_schedule unset if the query cannot be admitted
/// now, but also does not need to be rejected. If the query must be rejected, this
/// method returns false and sets queue_node->not_admitted_reason.
bool FindGroupToAdmitOrReject(
int64_t cluster_size, ClusterMembershipMgr::SnapshotPtr membership_snapshot,
const TPoolConfig& pool_config, bool admit_from_queue, PoolStats* pool_stats,
QueueNode* queue_node);
/// Dequeues the queued queries when notified by dequeue_cv_ and admits them if they
/// have not been cancelled yet.
void DequeueLoop();
/// Returns true if schedule can be admitted to the pool with pool_cfg.
/// admit_from_queue is true if attempting to admit from the queue. Otherwise, returns
/// false and not_admitted_reason specifies why the request can not be admitted
/// immediately. Caller owns not_admitted_reason. Must hold admission_ctrl_lock_.
bool CanAdmitRequest(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
int64_t cluster_size, bool admit_from_queue, std::string* not_admitted_reason);
/// Returns true if all executors can accommodate the largest initial reservation of
/// any executor and the backend running the coordinator fragment can accommodate its
/// own initial reservation. Otherwise, returns false with the details about the memory
/// shortage in 'mem_unavailable_reason'. Possible cases where it can return false are:
/// 1. The pool.max_query_mem_limit is set too low
/// 2. mem_limit in query options is set low and no max/min_query_mem_limit is set in
/// the pool configuration.
/// 3. mem_limit in query options is set low and min_query_mem_limit is also set low.
/// 4. mem_limit in query options is set low and the pool.min_query_mem_limit is set
/// to a higher value but pool.clamp_mem_limit_query_option is false.
/// 5. If a dedicated coordinator is used and the mem_limit in query options is set
/// lower than what is required to support the sum of initial memory reservations of
/// the fragments scheduled on the coordinator.
static bool CanAccommodateMaxInitialReservation(const QuerySchedule& schedule,
const TPoolConfig& pool_cfg, std::string* mem_unavailable_reason);
/// Returns true if there is enough memory available to admit the query based on the
/// schedule, the aggregate pool memory, and the per-host memory. If not, this returns
/// false and returns the reason in 'mem_unavailable_reason'. Caller owns
/// 'mem_unavailable_reason'.
/// Must hold admission_ctrl_lock_.
bool HasAvailableMemResources(const QuerySchedule& schedule,
const TPoolConfig& pool_cfg, int64_t cluster_size,
std::string* mem_unavailable_reason);
/// Returns true if there are enough available slots on all executors in the schedule to
/// fit the query schedule. The number of slots per executors does not change with the
/// group or cluster size and instead always uses pool_cfg.max_requests. If a host does
/// not have a free slot, this returns false and sets 'unavailable_reason'.
/// Must hold admission_ctrl_lock_.
bool HasAvailableSlots(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
string* unavailable_reason);
/// Updates the memory admitted and the num of queries running for each backend in
/// 'schedule'. Also updates the stats of its associated resource pool. Used only when
/// the 'schedule' is admitted.
void UpdateStatsOnAdmission(const QuerySchedule& schedule);
/// Updates the memory admitted and the num of queries running for each backend in
/// 'schedule' which have been release/completed. The list of completed backends is
/// specified in 'host_addrs'. Also updates the stats related to the admitted memory of
/// its associated resource pool.
void UpdateStatsOnReleaseForBackends(
const QuerySchedule& schedule, const std::vector<TNetworkAddress>& host_addrs);
/// Updates the memory admitted and the num of queries running on the specified host by
/// adding the specified mem, num_queries and slots to the host stats.
void UpdateHostStats(
const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit,
int num_slots_to_admit);
/// Rejection happens in several stages
/// 1) Based on static pool configuration
/// - Check if the pool is disabled (max_requests = 0, max_mem = 0)
/// - min_query_mem_limit > max_query_mem_limit (From IsPoolConfigValidForCluster)
/// 2) Based on the entire cluster size
/// - Check for maximum queue size (queue full)
/// 3) Based on the executor group size
/// - pool.min_query_mem_limit > max_mem (From IsPoolConfigValidForCluster)
/// - max_mem may depend on group size
/// 4) Based on a schedule
/// - largest_min_mem_reservation > buffer_pool_limit
/// - CanAccommodateMaxInitialReservation
/// - Thread reservation limit (thread_reservation_limit,
/// thread_reservation_aggregate_limit)
/// - cluster_min_mem_reservation_bytes > max_mem
/// - cluster_mem_to_admit > max_mem
/// - per_backend_mem_to_admit > min_admit_mem_limit
/// We lump together 1 & 2 and 3 & 4. The first two depend on the total cluster size.
/// The latter 2 depend on the executor group size and therefore on the schedule. If no
/// executor group is available, the query will be queued.
/// Returns true if a request must be rejected immediately based on the pool
/// configuration and cluster size, e.g. if the pool config is invalid, the pool is
/// disabled, or the queue is already full.
/// Must hold admission_ctrl_lock_.
bool RejectForCluster(const std::string& pool_name, const TPoolConfig& pool_cfg,
bool admit_from_queue, int64_t cluster_size, std::string* rejection_reason);
/// Returns true if a request must be rejected immediately based on the pool
/// configuration and a particular schedule, e.g. because the memory requirements of the
/// query exceed the maximum of the group. This assumes that all executor groups for a
/// pool are uniform and that a query rejected for one group will not be able to run on
/// other groups, either.
/// Must hold admission_ctrl_lock_.
bool RejectForSchedule(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
int64_t cluster_size, int64_t group_size, std::string* rejection_reason);
/// Gets or creates the PoolStats for pool_name. Must hold admission_ctrl_lock_.
PoolStats* GetPoolStats(const std::string& pool_name, bool dcheck_exists = false);
/// Gets or creates the PoolStats for query schedule 'schedule'. Scheduling must be done
/// already and the schedule must have an associated executor_group.
PoolStats* GetPoolStats(const QuerySchedule& schedule);
/// Log the reason for dequeuing of 'node' failing and add the reason to the query's
/// profile. Must hold admission_ctrl_lock_.
static void LogDequeueFailed(QueueNode* node, const std::string& not_admitted_reason);
/// Sets the per host mem limit and mem admitted in the schedule and does the necessary
/// accounting and logging on successful submission.
/// Caller must hold 'admission_ctrl_lock_'.
void AdmitQuery(QuerySchedule* schedule, bool was_queued);
/// Same as PoolToJson() but requires 'admission_ctrl_lock_' to be held by the caller.
/// Is a helper method used by both PoolToJson() and AllPoolsToJson()
void PoolToJsonLocked(const std::string& pool_name, rapidjson::Value* resource_pools,
rapidjson::Document* document);
/// Same as GetStalenessDetail() except caller must hold 'admission_ctrl_lock_'.
std::string GetStalenessDetailLocked(const std::string& prefix,
int64_t* ms_since_last_update = nullptr);
/// Returns the topic key for the pool at this backend, i.e. a string of the
/// form: "<pool_name><delimiter><backend_id>".
static std::string MakePoolTopicKey(
const std::string& pool_name, const std::string& backend_id);
/// Returns the maximum memory for the pool.
static int64_t GetMaxMemForPool(const TPoolConfig& pool_config, int64_t cluster_size);
/// Returns a description of how the maximum memory for the pool is configured.
static std::string GetMaxMemForPoolDescription(
const TPoolConfig& pool_config, int64_t cluster_size);
/// Returns the maximum number of requests that can run in the pool.
static int64_t GetMaxRequestsForPool(
const TPoolConfig& pool_config, int64_t cluster_size);
/// Returns a description of how the maximum number of requests that can run in the pool
/// is configured.
static std::string GetMaxRequestsForPoolDescription(
const TPoolConfig& pool_config, int64_t cluster_size);
/// Returns the effective queue timeout for the pool in milliseconds.
static int64_t GetQueueTimeoutForPoolMs(const TPoolConfig& pool_config);
/// Returns a maximum number of queries that should be dequeued locally from 'queue'
/// before DequeueLoop waits on dequeue_cv_ at the top of its loop.
/// If it can be determined that no queries can currently be run, then zero
/// is returned.
/// Uses a heuristic to limit the number of requests we dequeue locally to avoid all
/// impalads dequeuing too many requests at the same time.
int64_t GetMaxToDequeue(RequestQueue& queue, PoolStats* stats,
const TPoolConfig& pool_config, int64_t cluster_size);
/// Returns true if the pool has been disabled through configuration.
static bool PoolDisabled(const TPoolConfig& pool_config);
/// Returns true if the pool is configured to limit the number of running queries.
static bool PoolLimitsRunningQueriesCount(const TPoolConfig& pool_config);
/// Returns true if the pool has a fixed (i.e. not scalable) maximum memory limit.
static bool PoolHasFixedMemoryLimit(const TPoolConfig& pool_config);
/// Returns the maximum number of requests that can be queued in the pool.
static int64_t GetMaxQueuedForPool(
const TPoolConfig& pool_config, int64_t cluster_size);
/// Returns a description of how the maximum number of requests that can run be queued
/// in the pool is configured.
static std::string GetMaxQueuedForPoolDescription(
const TPoolConfig& pool_config, int64_t cluster_size);
/// Return all executor groups from 'all_groups' that can be used to run queries in
/// 'pool_name'.
void GetExecutorGroupsForPool(const ClusterMembershipMgr::ExecutorGroups& all_groups,
const std::string& pool_name, std::vector<const ExecutorGroup*>* matching_groups);
/// Returns the current size of the cluster.
int64_t GetClusterSize(const ClusterMembershipMgr::Snapshot& membership_snapshot);
/// Returns the size of executor group 'group_name' in 'membership_snapshot'.
int64_t GetExecutorGroupSize(const ClusterMembershipMgr::Snapshot& membership_snapshot,
const std::string& group_name);
/// Get the amount of memory to admit for the Backend with the given BackendExecParams.
/// This method may return different values depending on if the Backend is an Executor
/// or a Coordinator.
static int64_t GetMemToAdmit(
const QuerySchedule& schedule, const BackendExecParams& backend_exec_params);
/// Updates the list of executor groups for which we maintain the query load metrics.
/// Removes the metrics of the groups that no longer exist from the metric group and
/// adds new ones for the newly added groups.
void UpdateExecGroupMetricMap(ClusterMembershipMgr::SnapshotPtr snapshot);
/// Updates the num queries executing metric of the 'grp_name' executor group by
/// 'delta'. Only updates it if the metric exists ('grp_name' has non-zero executors).
/// Caller must hold 'admission_ctrl_lock_'. Must be called whenever a query is
/// admitted or released.
void UpdateExecGroupMetric(const string& grp_name, int64_t delta);
FRIEND_TEST(AdmissionControllerTest, Simple);
FRIEND_TEST(AdmissionControllerTest, PoolStats);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlots);
FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
FRIEND_TEST(AdmissionControllerTest, QueryRejection);
FRIEND_TEST(AdmissionControllerTest, DedicatedCoordQuerySchedule);
FRIEND_TEST(AdmissionControllerTest, DedicatedCoordAdmissionChecks);
friend class AdmissionControllerTest;
} // namespace impala