// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef SCHEDULING_ADMISSION_CONTROLLER_H
#define SCHEDULING_ADMISSION_CONTROLLER_H

#include <list>
#include <string>
#include <utility>
#include <vector>

#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
#include <gtest/gtest_prod.h>

#include "common/status.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/request-pool-service.h"
#include "scheduling/query-schedule.h"
#include "statestore/statestore-subscriber.h"
#include "util/condition-variable.h"
#include "util/internal-queue.h"
#include "util/runtime-profile.h"
#include "util/thread.h"

namespace impala {

class ExecEnv;

/// Represents the admission outcome of a query. It is stored in the 'admit_outcome'
/// input variable passed to AdmissionController::AdmitQuery() if an admission decision
/// has been made or the caller has initiated a cancellation.
enum class AdmissionOutcome {
  ADMITTED,
  REJECTED,
  TIMED_OUT,
  CANCELLED,
};

/// The AdmissionController is used to throttle requests (e.g. queries, DML) based
/// on available cluster resources, which are configured in one or more resource pools. A
/// request will either be admitted for immediate execution, queued for later execution,
/// or rejected (either immediately or after being queued). Resource pools can be
/// configured to have maximum number of concurrent queries, maximum cluster wide memory,
/// maximum queue size, max and min per host memory limit for every query, and to set
/// whether the mem_limit query option will be clamped by the previously mentioned max/min
/// per host limits or not. Queries will be queued if there are already too many queries
/// executing or there isn't enough available memory. Once the queue reaches the maximum
/// queue size, incoming queries will be rejected. Requests in the queue will time out
/// after a configurable timeout.
///
/// Depending on the -is_coordinator startup flag, multiple impalads can act as a
/// coordinator and thus also an admission controller, so some cluster state must be
/// shared between impalads in order to make admission decisions on any of them. Every
/// coordinator maintains some per-pool and per-host statistics related to the requests it
/// itself is servicing as the admission controller. Some of these local admission
/// statistics in addition to some backend-specific statistics (i.e. the backend executor
/// associated with the same impalad process) are disseminated across the cluster via the
/// statestore using the IMPALA_REQUEST_QUEUE_TOPIC topic. Effectively, coordinators send
/// statestore updates where the admission statistics reflect the load and all
/// participating backends send statestore updates reflecting the load they're executing.
///
/// Every <impalad, pool> pair is sent as a topic update at the statestore heartbeat
/// interval when pool statistics change, and the topic updates from other impalads are
/// used to re-compute the aggregate per-pool stats. Because the pool statistics are only
/// updated on statestore heartbeats and all decisions are made with the cached state,
/// the aggregate pool statistics are only estimates. As a result, more requests may be
/// admitted or queued than the configured thresholds, which are really soft limits.
///
/// Memory resources:
/// A pool may be configured to allow a maximum amount of memory resources to be
/// 'reserved' by requests admitted to that pool. While Impala does not yet truly
/// 'reserve' the memory at admission (i.e. Impala does not yet guarantee the memory for
/// a request, it is still possible to overadmit such that multiple queries think they
/// have reserved the same memory), the admission controller uses several metrics to
/// estimate the available memory and admit only when it thinks the necessary memory is
/// available. Future work will enable real reservations, but this is a much larger
/// effort and will involve changes outside of the admission controller.
///
/// The memory required for admission for a request is specified as the query option
/// MEM_LIMIT (either explicitly or via a default value). This is a per-host value. If
/// there is no memory limit, the per-host estimate from planning is used instead as a
/// memory limit and a lower bound is enforced on it based on the largest initial
/// reservation of the query. The final memory limit used is also clamped by the max/min
/// memory limits configured for the pool with an option to not enforce these limits on
/// the MEM_LIMIT query option (If both these max/min limits are not configured, then the
/// estimates from planning are not used as a memory limit and are only used for making
/// admission decisions. Moreover the estimates will no longer have a lower bound based on
/// the largest initial reservation).
/// The following four conditions must hold in order for the request to be admitted:
///  1) The current pool configuration is valid.
///  2) There must be enough memory resources available in this resource pool for the
///     request. The max memory resources configured for the resource pool specifies the
///     aggregate, cluster-wide memory that may be reserved by all executing queries in
///     this pool. Thus the aggregate memory to be reserved across all participating
///     backends for this request, *plus* that of already admitted requests must be less
///     than or equal to the max resources specified.
///  3) All participating backends must have enough memory available. Each impalad has a
///     per-process mem limit, and that is the max memory that can be reserved on that
///     backend.
///  3b) (optional) When using executor groups (see below) and admitting to the
///     non-default executor group, then the number of currently running queries must be
///     below the configured maximum for all participating backends.
///  4) The final per host memory limit used can accommodate the largest initial
///     reservation.
///
/// In order to admit based on these conditions, the admission controller accounts for
/// the following on both a per-host and per-pool basis:
///  a) Mem Reserved: the amount of memory that has been reported as reserved by all
///     backends, which come from the statestore topic updates. The values that are sent
///     come from the pool mem trackers in UpdateMemTrackerStats(), which reflects the
///     memory reserved by fragments that have begun execution. For queries that are
///     executing and have mem limits, the limit is considered to be its reserved memory
///     because it may consume up to that limit. Otherwise the query's current consumption
///     is used (see MemTracker::GetPoolMemReserved()). The per-pool and per-host
///     aggregates are computed in UpdateClusterAggregates(). This state, once all updates
///     are fully distributed and aggregated, provides enough information to make
///     admission decisions by any impalad. However, this requires waiting for both
///     admitted requests to start all remote fragments and then for the updated state to
///     be distributed via the statestore.
///  b) Mem Admitted: the amount of memory required (i.e. the value used in admission,
///     either the mem limit or estimate) for the requests that this impalad's admission
///     controller has admitted. Both the per-pool and per-host accounting is updated
///     when requests are admitted and released (and NOTE: not via the statestore, so
///     there is no latency, but this does not account for memory from requests admitted
///     by other impalads).
///  c) Num Admitted: the number of queries that have been admitted and are therefore
///     considered to be currently running. Note that there is currently no equivalent to
///     the reserved memory reporting, i.e. hosts do not report the actual number of
///     queries that are currently executing (IMPALA-8762). This prevents using multiple
///     coordinators with executor groups.
///
/// As described, both the 'reserved' and 'admitted' mem accounting mechanisms have
/// different advantages and disadvantages. The 'reserved' mem accounting works well in
/// the steady state, i.e. given enough time to distribute updates. The 'admitted'
/// mem accounting works perfectly when there is a single coordinator (and perhaps works
/// reasonably with just a few). The maximum of the reserved and admitted mem is used in
/// making admission decisions, which works well when either relatively few coordinators
/// are used or, if there is a wide distribution of requests across impalads, the rate of
/// submission is low enough that new state is able to be updated by the statestore.
///
/// Releasing Queries:
/// When queries complete they must be explicitly released from the admission controller
/// using the methods 'ReleaseQuery' and 'ReleaseQueryBackends'. These methods release
/// the admitted memory and decrement the number of admitted queries for the resource
/// pool. All Backends for a query must be released via 'ReleaseQueryBackends' before the
/// query is released using 'ReleaseQuery'. Releasing Backends releases the admitted
/// memory used by that Backend and decrements the number of running queries on the host
/// running that Backend. Releasing a query does not release any admitted memory, it only
/// decrements the number of running queries in the resource pool.
///
/// Executor Groups:
/// Executors in a cluster can be assigned to executor groups. Each executor can only be
/// in one group. A resource pool can have multiple executor groups associated with it.
/// Each executor group belongs to a single resource pool and will only serve requests
/// from that pool. I.e. the relationships are 1 resource pool : many executor groups and
/// 1 executor group : many executors.



///
/// Executors that don't specify an executor group name during startup are automatically
/// added to a default group called DEFAULT_EXECUTOR_GROUP_NAME. The default executor
/// group does not enforce query concurrency limits per host and as such can be admitted
/// to by multiple coordinators.
///
/// Executor groups are mapped to resource pools implicitly by their name. Queries in a
/// resource pool can run on all executor groups whose name starts with the pool's name,
/// separated by a '-'. For example, queries in a pool with name 'q1' can run on all
/// executor groups starting with 'q1-'. If no matching executor groups can be found for a
/// resource pool and the default executor group is not empty, then the default group is
/// used.
///
/// In addition to the checks described before, admission to executor groups is bounded by
/// the maximum number of queries that can run concurrently on an executor
/// (-admission_control_slots). An additional check is performed to ensure that each
/// executor in the group has an available slot to run the query. Admission controllers
/// include the number of queries that have been admitted to each executor in the
/// statestore updates.
///
/// In order to find an executor group that can run a query, the admission controller
/// calls FindGroupToAdmitOrReject(), either during the initial admission attempt or in
/// DequeueLoop(). If the cluster membership has changed, it (re-)computes schedules for
/// all executor groups and then tries to admit queries using the list of schedules.
/// Admission is always attempted in the same order so that executor groups fill up before
/// further ones are considered. In particular, we don't attempt to balance the queries
/// across executor groups.
///
/// Example without executor groups:
/// Consider a 10-node cluster with 100gb/node and a resource pool 'q1' configured with
/// 500gb of aggregate memory and 40gb as the max memory limit. An incoming request with
/// the MEM_LIMIT query option set to 50gb and scheduled to execute on all backends is
/// received by SubmitForAdmission() on an otherwise quiet cluster. Based on the pool
/// configuration, a per host mem limit of 40gb is used for this query and for any
/// subsequent checks that it needs to pass prior to admission. FindGroupToAdmitOrReject()
/// computes a schedule for the default executor group and performs rejection tests before
/// calling CanAdmitRequest(), which checks the number of running queries and then calls
/// HasAvailableMemResources() to check for memory resources. It first checks whether
/// there is enough memory for the request using PoolStats::EffectiveMemReserved() (which
/// is the max of the pool's agg_mem_reserved_ and local_mem_admitted_, see #1 above),
/// then checks for enough memory on each individual host via the max of mem_reserved and
/// mem_admitted in hosts_stats_ (see #2 above) and finally checks if the memory limit
/// used for this query can accommodate its largest initial reservation. In this case,
/// ample resources are available so CanAdmitRequest() returns true. PoolStats::Admit() is
/// called to update q1's PoolStats: it first updates agg_num_running_ and
/// local_mem_admitted_ which are available to be used immediately for incoming admission
/// requests, then it updates num_admitted_running in the struct sent to the statestore
/// (local_stats_). UpdateHostStats() is called to update the per-host admitted mem
/// (stored in the map host_stats_) for all participating hosts. Then SubmitForAdmission()
/// returns to the ClientRequestState. If another identical admission request is received
/// by the same coordinator immediately, it will be rejected because q1's
/// local_mem_admitted_ is already 400gb. If that request were sent to another impalad at
/// the same time, it would have been admitted because not all updates have been
/// disseminated yet. The next statestore update will contain the updated value of
/// num_admitted_running for q1 on this backend. As remote fragments begin execution on
/// remote impalads, their pool mem trackers will reflect the updated amount of memory
/// reserved (set in local_stats_.backend_mem_reserved by UpdateMemTrackerStats()) and the
/// next statestore updates coming from those impalads will contain the updated value. As
/// the statestore updates are received (in the subscriber callback fn UpdatePoolStats()),
/// the incoming per-backend, per-pool mem_reserved values are aggregated to
/// PoolStats::agg_mem_reserved_ (pool aggregate over all hosts) and backend_mem_reserved_
/// (per-host aggregates over all pools). Once this has happened, any incoming admission
/// request now has the updated state required to make correct admission decisions.
///
/// Example with executor groups:
/// Consider a cluster with a dedicated coordinator and 2 executor groups
/// "default-pool-group-1" and "default-pool-group-2" (the number of executors per group
/// does not matter for this example). Both executor groups will be able to serve requests
/// from the default resource pool. Consider that each executor has only one admission
/// slot i.e. --admission_control_slots=1 is specified for all executors. An incoming
/// query with mt_dop=1 is submitted through SubmitForAdmission(), which calls
/// FindGroupToAdmitOrReject(). From there we call ComputeGroupSchedules() which calls
/// compute schedules for both executor groups. Then we perform rejection tests and
/// afterwards call CanAdmitRequest() for each of the schedules. Executor groups are
/// processed in alphanumerically sorted order, so we attempt admission to group
/// "default-pool-group-1" first. CanAdmitRequest() calls HasAvailableSlots() to check
/// whether any of the hosts in the group can fit the new query in their available slots
/// and since it does fit, admission succeeds. The query is admitted and 'slots_in_use'
/// is incremented for each host in that group based on the effective parallelism of the
/// query. When a second query arrives while the first one is still running, we perform
/// the same steps. In particular we compute schedules for both groups and consider
/// admission to default-pool-group-1 first. However, the check in HasAvailableSlots()
/// now fails and we will consider group default-pool-group-2 next. For this group,
/// the check succeeds and the query is admitted, incrementing the num_admitted counter
/// for each host in group default-pool-group-2.
///
/// Queuing Behavior:
/// Once the resources in a pool are consumed, each coordinator receiving requests will
/// begin queuing. While each individual queue is FIFO, there is no total ordering on the
/// queued requests between admission controllers and no FIFO behavior is guaranteed for
/// requests submitted to different coordinators. When resources become available, there
/// is no synchronous coordination between nodes used to determine which get to dequeue
/// and admit requests. Instead, we use a simple heuristic to try to dequeue a number of
/// requests proportional to the number of requests that are waiting in each individual
/// admission controller to the total number of requests queued across all admission
/// controllers (i.e. impalads). This limits the amount of overadmission that may result
/// from a large amount of resources becoming available at the same time. When there are
/// requests queued in multiple pools on the same host, the admission controller simply
/// iterates over the pools in pool_stats_ and attempts to dequeue from each. This is fine
/// for the max_requests limit, but is unfair for memory-based admission because the
/// iteration order of pools effectively gives priority to the queues at the beginning.
/// Requests across queues may be competing for the same resources on particular hosts,
/// i.e. #2 in the description of memory-based admission above. Note the pool's
/// max_mem_resources (#1) is not contented.
///
/// Cancellation Behavior:
/// An admission request<schedule, admit_outcome> submitted using SubmitForAdmission() can
/// be proactively cancelled by setting the 'admit_outcome' to
/// AdmissionOutcome::CANCELLED. This is handled asynchronously by SubmitForAdmission()
/// and DequeueLoop().
///
/// Pool Configuration Mechanism:
/// The path to pool config files are specified using the startup flags
/// "fair_scheduler_allocation_path" and "llama_site_path". The format for specifying pool
/// configs is based on yarn and llama with additions specific to Impala. A file
/// monitoring service is started that monitors changes made to these files. Those changes
/// are only propagated to Impala when a new query is serviced. See RequestPoolService
/// class for more details.
///

class AdmissionController {
 public:
  // Profile info strings
  static const std::string PROFILE_INFO_KEY_ADMISSION_RESULT;
  static const std::string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
  static const std::string PROFILE_INFO_VAL_QUEUED;
  static const std::string PROFILE_INFO_VAL_CANCELLED_IN_QUEUE;
  static const std::string PROFILE_INFO_VAL_ADMIT_QUEUED;
  static const std::string PROFILE_INFO_VAL_REJECTED;
  static const std::string PROFILE_INFO_VAL_TIME_OUT;
  static const std::string PROFILE_INFO_KEY_INITIAL_QUEUE_REASON;
  static const std::string PROFILE_INFO_VAL_INITIAL_QUEUE_REASON;
  static const std::string PROFILE_INFO_KEY_LAST_QUEUED_REASON;
  static const std::string PROFILE_INFO_KEY_ADMITTED_MEM;
  static const std::string PROFILE_INFO_KEY_EXECUTOR_GROUP;
  static const std::string PROFILE_INFO_KEY_STALENESS_WARNING;
  static const std::string PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME;

  AdmissionController(ClusterMembershipMgr* cluster_membership_mgr,
      StatestoreSubscriber* subscriber, RequestPoolService* request_pool_service,
      MetricGroup* metrics, const TNetworkAddress& host_addr);
  ~AdmissionController();

  /// This struct contains all information needed to create a QuerySchedule and try to
  /// admit it. None of the members are owned by the instances of this class (usually they
  /// are owned by the ClientRequestState).
  struct AdmissionRequest {
    const TUniqueId& query_id;
    const TQueryExecRequest& request;
    const TQueryOptions& query_options;
    RuntimeProfile* summary_profile;
    RuntimeProfile::EventSequence* query_events;
  };

  /// Submits the request for admission. May returns immediately if rejected, but
  /// otherwise blocks until the request is either admitted, times out, gets rejected
  /// later, or cancelled by the client (by setting 'admit_outcome' to CANCELLED). When
  /// this method returns, the following <admit_outcome, Return Status> pairs are
  /// possible:
  /// - Admitted: <ADMITTED, Status::OK>
  /// - Rejected or timed out: <REJECTED or TIMED_OUT, Status(msg: reason for the same)>
  /// - Cancelled: <CANCELLED, Status::CANCELLED>
  /// If admitted, ReleaseQuery() should also be called after the query completes or gets
  /// cancelled to ensure that the pool statistics are updated.
  Status SubmitForAdmission(const AdmissionRequest& request,
      Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
      std::unique_ptr<QuerySchedule>* schedule_result);

  /// Updates the pool statistics when a query completes (either successfully,
  /// is cancelled or failed). This should be called for all requests that have
  /// been submitted via AdmitQuery(). 'schedule' is the QuerySchedule of the completed
  /// query and 'peak_mem_consumption' is the peak memory consumption of the query.
  /// This does not block.
  void ReleaseQuery(const QuerySchedule& schedule, int64_t peak_mem_consumption);

  /// Updates the pool statistics when a Backend running a query completes (either
  /// successfully, is cancelled or failed). This should be called for all Backends part
  /// of a query for all queries that have been submitted via AdmitQuery().
  /// 'schedule' is the QuerySchedule of the associated query and the vector of
  /// TNetworkAddresses identify the completed Backends.
  /// This does not block.
  void ReleaseQueryBackends(
      const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addr);

  /// Registers the request queue topic with the statestore, starts up the dequeue thread
  /// and registers a callback with the cluster membership manager to receive updates for
  /// membership changes.
  Status Init();

  /// Serializes relevant stats, configurations and information associated with queued
  /// queries for the resource pool identified by 'pool_name' to JSON by adding members to
  /// 'resource_pools'. Is a no-op if a pool with name 'pool_name' does not exist or no
  /// queries have been submitted to that pool yet.
  void PoolToJson(const std::string& pool_name, rapidjson::Value* resource_pools,
      rapidjson::Document* document);

  /// Serializes relevant stats, configurations and information associated with queued
  /// queries for every resource pool (to which queries have been submitted at least once)
  /// to JSON by adding members to 'resource_pools'.
  void AllPoolsToJson(rapidjson::Value* resource_pools, rapidjson::Document* document);

  /// Calls ResetInformationalStats on the pool identified by 'pool_name'.
  void ResetPoolInformationalStats(const std::string& pool_name);

  /// Calls ResetInformationalStats on all pools.
  void ResetAllPoolInformationalStats();

  // This struct stores per-host statistics which are used during admission and by HTTP
  // handlers to query admission control statistics for currently registered backends.
  struct HostStats {
    /// The mem reserved for a query that is currently executing is its memory limit, if
    /// set (which should be the common case with admission control). Otherwise, if the
    /// query has no limit or the query is finished executing, the current consumption
    /// (tracked by its query mem tracker) is used.
    int64_t mem_reserved = 0;
    /// The per host mem admitted only for the queries admitted locally.
    int64_t mem_admitted = 0;
    /// The per host number of queries admitted only for the queries admitted locally.
    int64_t num_admitted = 0;
    /// The per host number of slots in use for the queries admitted locally.
    int64_t slots_in_use = 0;
  };

  typedef std::unordered_map<std::string, HostStats> PerHostStats;

  // Populates the input map with the per host memory reserved and admitted in the
  // following format: <host_address_str, pair<mem_reserved, mem_admitted>>.
  // Only used for populating the 'backends' debug page.
  void PopulatePerHostMemReservedAndAdmitted(PerHostStats* host_stats);

  /// Returns a non-empty string with a warning if the admission control data is stale.
  /// 'prefix' is added to the start of the string. Returns an empty string if not stale.
  /// If 'ms_since_last_update' is non-null, set it to the time in ms since last update.
  /// Caller must not hold 'admission_ctrl_lock_'.
  std::string GetStalenessDetail(const std::string& prefix,
      int64_t* ms_since_last_update = nullptr);

 private:
  class PoolStats;
  friend class PoolStats;

  /// Pointer to the cluster membership manager. Not owned by the AdmissionController.
  ClusterMembershipMgr* cluster_membership_mgr_;

  /// Subscription manager used to handle admission control updates. This is not
  /// owned by this class.
  StatestoreSubscriber* subscriber_;

  /// Used for user-to-pool resolution and looking up pool configurations. Not owned by
  /// the AdmissionController.
  RequestPoolService* request_pool_service_;

  /// Metrics subsystem access
  MetricGroup* metrics_group_;

  /// Maps names of executor groups to their respective query load metric.
  std::unordered_map<std::string, IntGauge*> exec_group_query_load_map_;

  /// Thread dequeuing and admitting queries.
  std::unique_ptr<Thread> dequeue_thread_;

  // The local impalad's host/port id, used to construct topic keys.
  const std::string host_id_;

  /// Serializes/deserializes TPoolStats when sending and receiving topic updates.
  ThriftSerializer thrift_serializer_;

  /// Protects all access to all variables below.
  boost::mutex admission_ctrl_lock_;

  /// The last time a topic update was processed. Time is obtained from
  /// MonotonicMillis(), or is 0 if an update was never received.
  int64_t last_topic_update_time_ms_ = 0;

  PerHostStats host_stats_;

  /// Contains all per-pool statistics and metrics. Accessed via GetPoolStats().
  class PoolStats {
   public:
    struct PoolMetrics {
      /// Monotonically increasing counters (since process start) referring to this
      /// host's admission controller.
      IntCounter* total_admitted;
      IntCounter* total_rejected;
      IntCounter* total_queued;
      IntCounter* total_dequeued; // Does not include those in total_timed_out
      IntCounter* total_timed_out;
      IntCounter* total_released;
      IntCounter* time_in_queue_ms;

      /// The following mirror the current values in PoolStats.
      /// TODO: Avoid duplication: replace the int64_t fields on PoolStats with these.
      IntGauge* agg_num_running;
      IntGauge* agg_num_queued;
      IntGauge* agg_mem_reserved;
      IntGauge* local_mem_admitted;

      /// The following mirror the current values of local_stats_.
      /// TODO: As above, consolidate the metrics and local_stats_.
      IntGauge* local_num_admitted_running;
      IntGauge* local_num_queued;
      IntGauge* local_backend_mem_reserved;
      IntGauge* local_backend_mem_usage;

      /// Metrics exposing the pool settings.
      IntGauge* pool_max_mem_resources;
      IntGauge* pool_max_requests;
      IntGauge* pool_max_queued;
      IntGauge* pool_queue_timeout;
      IntGauge* max_query_mem_limit;
      IntGauge* min_query_mem_limit;
      BooleanProperty* clamp_mem_limit_query_option;
      DoubleGauge* max_running_queries_multiple;
      DoubleGauge* max_queued_queries_multiple;
      IntGauge* max_memory_multiple;
      /// Metrics exposing the pool's derived runtime configuration.
      IntGauge* max_running_queries_derived;
      IntGauge* max_queued_queries_derived;
      IntGauge* max_memory_derived;
    };

    PoolStats(AdmissionController* parent, const std::string& name)
      : name_(name), parent_(parent), agg_num_running_(0), agg_num_queued_(0),
        agg_mem_reserved_(0), local_mem_admitted_(0), wait_time_ms_ema_(0.0) {
      peak_mem_histogram_.resize(HISTOGRAM_NUM_OF_BINS, 0);
      InitMetrics();
    }

    int64_t agg_num_running() const { return agg_num_running_; }
    int64_t agg_num_queued() const { return agg_num_queued_; }
    int64_t EffectiveMemReserved() const {
      return std::max(agg_mem_reserved_, local_mem_admitted_);
    }

    // ADMISSION LIFECYCLE METHODS
    /// Updates the pool stats when the request represented by 'schedule' is admitted.
    void AdmitQueryAndMemory(const QuerySchedule& schedule);
    /// Updates the pool stats except the memory admitted stat.
    void ReleaseQuery(int64_t peak_mem_consumption);
    /// Releases the specified memory from the pool stats.
    void ReleaseMem(int64_t mem_to_release);
    /// Updates the pool stats when the request represented by 'schedule' is queued.
    void Queue();
    /// Updates the pool stats when the request represented by 'schedule' is dequeued.
    void Dequeue(bool timed_out);

    // STATESTORE CALLBACK METHODS
    /// Updates the local_stats_.backend_mem_reserved with the pool mem tracker. Called
    /// before sending local_stats().
    void UpdateMemTrackerStats();

    /// Called on a full topic update to clear all stats before processing the update.
    void ClearRemoteStats() { remote_stats_.clear(); }

    /// Called to update remote host TPoolStats with the new host_stats for the
    /// specified host. If host_stats is NULL the stats for the specified remote host
    /// are removed (i.e. topic deletion).
    void UpdateRemoteStats(const std::string& backend_id, TPoolStats* host_stats);

    /// Maps from host id to memory reserved and memory admitted, both aggregates over all
    /// pools. See the class doc for a detailed definition of reserved and admitted.
    /// Protected by admission_ctrl_lock_.
    typedef boost::unordered_map<std::string, int64_t> HostMemMap;

    /// Called after updating local_stats_ and remote_stats_ to update the aggregate
    /// values of agg_num_running_, agg_num_queued_, and agg_mem_reserved_. The in/out
    /// parameter host_mem_reserved is a map from host id to memory reserved used to
    /// aggregate the mem reserved values across all pools for each host. Used by
    /// UpdateClusterAggregates() to update host_mem_reserved_; it provides the host
    /// aggregates when called over all pools.
    void UpdateAggregates(HostMemMap* host_mem_reserved);

    const TPoolStats& local_stats() { return local_stats_; }

    /// Updates the metrics exposing the pool configuration to those in pool_cfg.
    void UpdateConfigMetrics(const TPoolConfig& pool_cfg, int64_t cluster_size);

    /// Updates the metrics exposing the scalable pool configuration values.
    void UpdateDerivedMetrics(const TPoolConfig& pool_cfg, int64_t cluster_size);

    PoolMetrics* metrics() { return &metrics_; }
    std::string DebugString() const;

    /// Updates the metric keeping track of total time in queue and the exponential
    /// moving average of query wait time for all queries submitted to this pool.
    void UpdateWaitTime(int64_t wait_time_ms);

    /// Serializes relevant stats and configurations to JSON by adding members to 'pool'.
    void ToJson(rapidjson::Value* pool, rapidjson::Document* document) const;

    /// Resets the informational stats like those keeping track of absolute
    /// values(totals), the peak query memory histogram, and the exponential moving
    /// average of wait time.
    void ResetInformationalStats();

    const std::string& name() const { return name_; }

   private:
    const std::string name_;
    AdmissionController* parent_;

    /// Aggregate (across all hosts) number of running queries in this pool. Updated
    /// by Admit(), Release(), and after processing statestore updates by
    /// UpdateAggregates().
    int64_t agg_num_running_;

    /// Aggregate (across all hosts) number of queued requests. Updated by Queue(),
    /// Dequeue(), and after processing statestore updates by UpdateAggregates().
    int64_t agg_num_queued_;

    /// Aggregate memory reported as reserved for fragments executing in this pool by
    /// every host, i.e. the sum of all local_stats_.mem_reserved from all
    /// other hosts. Updated only by UpdateAggregates().
    int64_t agg_mem_reserved_;

    /// Memory in this pool (across all nodes) that is needed for requests that have been
    /// admitted by this local coordinator. Updated only on Admit() and Release(). Stored
    /// separately from the other 'local' stats in local_stats_ because it is not sent
    /// to the statestore (no 'aggregated' value is needed).
    int64_t local_mem_admitted_;

    /// This pool's TPoolStats for this host. Sent to the statestore (and thus not stored
    /// in remote_stats_ with the remote hosts). Most fields are updated eagerly and used
    /// for local admission decisions. local_stats_.backend_mem_reserved is the
    /// exception: it is not used in local admission decisions so it can be updated
    /// lazily before sending a statestore update.
    TPoolStats local_stats_;

    /// Map of host_ids to the latest TPoolStats. Entirely generated by incoming
    /// statestore updates; updated by UpdateRemoteStats() and used by UpdateAggregates().
    typedef boost::unordered_map<std::string, TPoolStats> RemoteStatsMap;
    RemoteStatsMap remote_stats_;

    /// Per-pool metrics, created by InitMetrics().
    PoolMetrics metrics_;

    /// A histogram of the peak memory used by a query among all hosts. Its a vector of
    /// size 'HISTOGRAM_NUM_OF_BINS' and every i-th element represents the number of
    /// queries that had recorded a peak memory between (i, i+1] * HISTOGRAM_BIN_SIZE
    /// Bytes, except for the last one that represents a memory range of
    /// (HISTOGRAM_NUM_OF_BINS - 1, infinity) * HISTOGRAM_BIN_SIZE Bytes.
    std::vector<int64_t> peak_mem_histogram_;
    static const int64_t HISTOGRAM_NUM_OF_BINS;
    static const int64_t HISTOGRAM_BIN_SIZE;

    /// Keeps track of exponential moving average of all queries submitted to this pool
    /// that were not rejected. A weighting multiplier of value 'EMA_MULTIPLIER' is used.
    double wait_time_ms_ema_;
    static const double EMA_MULTIPLIER;

    void InitMetrics();

    FRIEND_TEST(AdmissionControllerTest, Simple);
    FRIEND_TEST(AdmissionControllerTest, PoolStats);
    FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
    FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
    FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
    FRIEND_TEST(AdmissionControllerTest, QueryRejection);
    friend class AdmissionControllerTest;
  };

  /// Map of pool names to pool stats. Accessed via GetPoolStats().
  /// Protected by admission_ctrl_lock_.
  typedef boost::unordered_map<std::string, PoolStats> PoolStatsMap;
  PoolStatsMap pool_stats_;

  /// This struct groups together a schedule and the executor group that it was scheduled
  /// on. It is used to attempt admission without rescheduling the query in case the
  /// cluster membership has not changed. Users of the struct must make sure that
  /// executor_group stays valid.
  struct GroupSchedule {
    GroupSchedule(
        std::unique_ptr<QuerySchedule> schedule, const ExecutorGroup& executor_group)
      : schedule(std::move(schedule)), executor_group(executor_group) {}
    std::unique_ptr<QuerySchedule> schedule;
    const ExecutorGroup& executor_group;
  };

  /// The set of pools that have changed between topic updates that need stats to be sent
  /// to the statestore. The key is the pool name.
  typedef boost::unordered_set<std::string> PoolSet;
  PoolSet pools_for_updates_;

  /// Structure stored in the RequestQueue representing an admission request. This struct
  /// lives only during the call to AdmitQuery() but its members live past that and are
  /// owned by the ClientRequestState object associated with them.
  ///
  /// Objects of this class progress linearly through the following states.
  /// - Initialized: The request has been created
  /// - Admitting: The request has been attempted to be admitted at least once and
  ///   additional intermediate state has been stored in some members
  /// - Admitted: The request was admitted, cancelled, or rejected and 'admit_outcome' is
  ///   set. If it was admitted, 'admitted_schedule' is also not nullptr.
  struct QueueNode : public InternalQueue<QueueNode>::Node {
    QueueNode(AdmissionRequest request,
        Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admission_outcome,
        RuntimeProfile* profile)
      : admission_request(std::move(request)),
        profile(profile),
        admit_outcome(admission_outcome) {}

    /////////////////////////////////////////
    /// BEGIN: Members that are valid for new objects after initialization

    /// The admission request contains everything required to build schedules.
    const AdmissionRequest admission_request;

    /// Profile to be updated with information about admission.
    RuntimeProfile* profile;

    /// END: Members that are valid for new objects after initialization
    /////////////////////////////////////////

    /////////////////////////////////////////
    /// BEGIN: Members that are only valid while queued, but invalid once dequeued.

    /// The membership snapshot used during the last admission attempt. It can be nullptr
    /// before the first admission attempt and if any schedules have been created,
    /// 'group_schedule' will contain the corresponding schedules and executor groups.
    ClusterMembershipMgr::SnapshotPtr membership_snapshot;

    /// List of schedules and executor groups that can be attempted to be admitted for
    /// this queue node.
    std::vector<GroupSchedule> group_schedules;

    /// END: Members that are only valid while queued, but invalid once dequeued.
    /////////////////////////////////////////

    /////////////////////////////////////////
    /// BEGIN: Members that are valid after admission / cancellation / rejection

    /// The last reason why this request could not be admitted.
    std::string not_admitted_reason;

    /// The Admission outcome of the queued request.
    Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* const admit_outcome;

    /// The schedule of the query if it was admitted successfully. Nullptr if it has not
    /// been admitted or was cancelled or rejected.
    std::unique_ptr<QuerySchedule> admitted_schedule = nullptr;

    /// END: Members that are valid after admission / cancellation / rejection
    /////////////////////////////////////////
  };

  /// Queue for the queries waiting to be admitted for execution. Once the
  /// maximum number of concurrently executing queries has been reached,
  /// incoming queries are queued and admitted first come, first served.
  typedef InternalQueue<QueueNode> RequestQueue;

  /// Map of pool names to request queues.
  typedef boost::unordered_map<std::string, RequestQueue> RequestQueueMap;
  RequestQueueMap request_queue_map_;

  /// Map of pool names to the pool configs returned by request_pool_service_. Stored so
  /// that the dequeue thread does not need to access the configs via the request pool
  /// service again (which involves a JNI call and error checking).
  typedef boost::unordered_map<std::string, TPoolConfig> PoolConfigMap;
  PoolConfigMap pool_config_map_;

  /// Notifies the dequeuing thread that pool stats have changed and it may be
  /// possible to dequeue and admit queries.
  ConditionVariable dequeue_cv_;

  /// If true, tear down the dequeuing thread. This only happens in unit tests.
  bool done_;

  /// Tracks the number of released Backends for each active query. Used purely for
  /// internal state validation. Used to ensure that all Backends are released before
  /// the query is released.
  typedef boost::unordered_map<TUniqueId, int> NumReleasedBackends;
  NumReleasedBackends num_released_backends_;

  /// Resolves the resource pool name in 'query_ctx.request_pool' and stores the resulting
  /// name in 'pool_name' and the resulting config in 'pool_config'.
  Status ResolvePoolAndGetConfig(const TQueryCtx& query_ctx, std::string* pool_name,
      TPoolConfig* pool_config);

  /// Statestore subscriber callback that sends outgoing topic deltas (see
  /// AddPoolUpdates()) and processes incoming topic deltas, updating the PoolStats
  /// state.
  void UpdatePoolStats(
      const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
      std::vector<TTopicDelta>* subscriber_topic_updates);

  /// Adds outgoing topic updates to subscriber_topic_updates for pools that have changed
  /// since the last call to AddPoolUpdates(). Called by UpdatePoolStats() before
  /// UpdateClusterAggregates(). Must hold admission_ctrl_lock_.
  void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);

  /// Updates the remote stats with per-host topic_updates coming from the statestore.
  /// Removes remote stats identified by topic deletions coming from the
  /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
  void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);

  /// Re-computes the per-pool aggregate stats and the per-host aggregates in host_stats_
  /// using each pool's remote_stats_ and local_stats_.
  /// Called by UpdatePoolStats() after handling updates and deletions.
  /// Must hold admission_ctrl_lock_.
  void UpdateClusterAggregates();

  /// Computes schedules for all executor groups that can run the query in 'queue_node'.
  /// For subsequent calls schedules are only re-computed if the membership version inside
  /// 'membership_snapshot' has changed. Will return any errors that occur during
  /// scheduling, e.g. if the scan range generation fails. Note that this will not return
  /// an error if no executor groups are available for scheduling, but will set
  /// 'queue_node->not_admitted_reason' and leave 'queue_node->group_schedules' empty in
  /// that case.
  Status ComputeGroupSchedules(
      ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node);

  /// Reschedules the query if necessary using 'membership_snapshot' and tries to find an
  /// executor group that the query can be admitted to. If the query is unable to run on
  /// any of the groups irrespective of their current workload, it is rejected. Returns
  /// true and sets queue_node->admitted_schedule if the query can be admitted. Returns
  /// true and keeps queue_node->admitted_schedule unset if the query cannot be admitted
  /// now, but also does not need to be rejected. If the query must be rejected, this
  /// method returns false and sets queue_node->not_admitted_reason.
  bool FindGroupToAdmitOrReject(
      int64_t cluster_size, ClusterMembershipMgr::SnapshotPtr membership_snapshot,
      const TPoolConfig& pool_config, bool admit_from_queue, PoolStats* pool_stats,
      QueueNode* queue_node);

  /// Dequeues the queued queries when notified by dequeue_cv_ and admits them if they
  /// have not been cancelled yet.
  void DequeueLoop();

  /// Returns true if schedule can be admitted to the pool with pool_cfg.
  /// admit_from_queue is true if attempting to admit from the queue. Otherwise, returns
  /// false and not_admitted_reason specifies why the request can not be admitted
  /// immediately. Caller owns not_admitted_reason. Must hold admission_ctrl_lock_.
  bool CanAdmitRequest(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
      int64_t cluster_size, bool admit_from_queue, std::string* not_admitted_reason);

  /// Returns true if all executors can accommodate the largest initial reservation of
  /// any executor and the backend running the coordinator fragment can accommodate its
  /// own initial reservation. Otherwise, returns false with the details about the memory
  /// shortage in 'mem_unavailable_reason'. Possible cases where it can return false are:
  /// 1. The pool.max_query_mem_limit is set too low
  /// 2. mem_limit in query options is set low and no max/min_query_mem_limit is set in
  ///    the pool configuration.
  /// 3. mem_limit in query options is set low and min_query_mem_limit is also set low.
  /// 4. mem_limit in query options is set low and the pool.min_query_mem_limit is set
  ///    to a higher value but pool.clamp_mem_limit_query_option is false.
  /// 5. If a dedicated coordinator is used and the mem_limit in query options is set
  ///    lower than what is required to support the sum of initial memory reservations of
  ///    the fragments scheduled on the coordinator.
  static bool CanAccommodateMaxInitialReservation(const QuerySchedule& schedule,
      const TPoolConfig& pool_cfg, std::string* mem_unavailable_reason);

  /// Returns true if there is enough memory available to admit the query based on the
  /// schedule, the aggregate pool memory, and the per-host memory. If not, this returns
  /// false and returns the reason in 'mem_unavailable_reason'. Caller owns
  /// 'mem_unavailable_reason'.
  /// Must hold admission_ctrl_lock_.
  bool HasAvailableMemResources(const QuerySchedule& schedule,
      const TPoolConfig& pool_cfg, int64_t cluster_size,
      std::string* mem_unavailable_reason);

  /// Returns true if there are enough available slots on all executors in the schedule to
  /// fit the query schedule. The number of slots per executors does not change with the
  /// group or cluster size and instead always uses pool_cfg.max_requests. If a host does
  /// not have a free slot, this returns false and sets 'unavailable_reason'.
  /// Must hold admission_ctrl_lock_.
  bool HasAvailableSlots(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
      string* unavailable_reason);

  /// Updates the memory admitted and the num of queries running for each backend in
  /// 'schedule'. Also updates the stats of its associated resource pool. Used only when
  /// the 'schedule' is admitted.
  void UpdateStatsOnAdmission(const QuerySchedule& schedule);

  /// Updates the memory admitted and the num of queries running for each backend in
  /// 'schedule' which have been release/completed. The list of completed backends is
  /// specified in 'host_addrs'. Also updates the stats related to the admitted memory of
  /// its associated resource pool.
  void UpdateStatsOnReleaseForBackends(
      const QuerySchedule& schedule, const std::vector<TNetworkAddress>& host_addrs);

  /// Updates the memory admitted and the num of queries running on the specified host by
  /// adding the specified mem, num_queries and slots to the host stats.
  void UpdateHostStats(
      const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit,
      int num_slots_to_admit);

  /// Rejection happens in several stages
  /// 1) Based on static pool configuration
  ///     - Check if the pool is disabled (max_requests = 0, max_mem = 0)
  ///     - min_query_mem_limit > max_query_mem_limit (From IsPoolConfigValidForCluster)
  ///
  /// 2) Based on the entire cluster size
  ///     - Check for maximum queue size (queue full)
  ///
  /// 3) Based on the executor group size
  ///     - pool.min_query_mem_limit > max_mem (From IsPoolConfigValidForCluster)
  ///       - max_mem may depend on group size
  ///
  /// 4) Based on a schedule
  ///     - largest_min_mem_reservation > buffer_pool_limit
  ///     - CanAccommodateMaxInitialReservation
  ///     - Thread reservation limit (thread_reservation_limit,
  ///       thread_reservation_aggregate_limit)
  ///     - cluster_min_mem_reservation_bytes > max_mem
  ///     - cluster_mem_to_admit > max_mem
  ///     - per_backend_mem_to_admit > min_admit_mem_limit
  ///
  /// We lump together 1 & 2 and 3 & 4. The first two depend on the total cluster size.
  /// The latter 2 depend on the executor group size and therefore on the schedule. If no
  /// executor group is available, the query will be queued.

  /// Returns true if a request must be rejected immediately based on the pool
  /// configuration and cluster size, e.g. if the pool config is invalid, the pool is
  /// disabled, or the queue is already full.
  /// Must hold admission_ctrl_lock_.
  bool RejectForCluster(const std::string& pool_name, const TPoolConfig& pool_cfg,
      bool admit_from_queue, int64_t cluster_size, std::string* rejection_reason);

  /// Returns true if a request must be rejected immediately based on the pool
  /// configuration and a particular schedule, e.g. because the memory requirements of the
  /// query exceed the maximum of the group. This assumes that all executor groups for a
  /// pool are uniform and that a query rejected for one group will not be able to run on
  /// other groups, either.
  /// Must hold admission_ctrl_lock_.
  bool RejectForSchedule(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
      int64_t cluster_size, int64_t group_size, std::string* rejection_reason);

  /// Gets or creates the PoolStats for pool_name. Must hold admission_ctrl_lock_.
  PoolStats* GetPoolStats(const std::string& pool_name, bool dcheck_exists = false);

  /// Gets or creates the PoolStats for query schedule 'schedule'. Scheduling must be done
  /// already and the schedule must have an associated executor_group.
  PoolStats* GetPoolStats(const QuerySchedule& schedule);

  /// Log the reason for dequeuing of 'node' failing and add the reason to the query's
  /// profile. Must hold admission_ctrl_lock_.
  static void LogDequeueFailed(QueueNode* node, const std::string& not_admitted_reason);

  /// Sets the per host mem limit and mem admitted in the schedule and does the necessary
  /// accounting and logging on successful submission.
  /// Caller must hold 'admission_ctrl_lock_'.
  void AdmitQuery(QuerySchedule* schedule, bool was_queued);

  /// Same as PoolToJson() but requires 'admission_ctrl_lock_' to be held by the caller.
  /// Is a helper method used by both PoolToJson() and AllPoolsToJson()
  void PoolToJsonLocked(const std::string& pool_name, rapidjson::Value* resource_pools,
      rapidjson::Document* document);

  /// Same as GetStalenessDetail() except caller must hold 'admission_ctrl_lock_'.
  std::string GetStalenessDetailLocked(const std::string& prefix,
      int64_t* ms_since_last_update = nullptr);

  /// Returns the topic key for the pool at this backend, i.e. a string of the
  /// form: "<pool_name><delimiter><backend_id>".
  static std::string MakePoolTopicKey(
      const std::string& pool_name, const std::string& backend_id);

  /// Returns the maximum memory for the pool.
  static int64_t GetMaxMemForPool(const TPoolConfig& pool_config, int64_t cluster_size);

  /// Returns a description of how the maximum memory for the pool is configured.
  static std::string GetMaxMemForPoolDescription(
      const TPoolConfig& pool_config, int64_t cluster_size);

  /// Returns the maximum number of requests that can run in the pool.
  static int64_t GetMaxRequestsForPool(
      const TPoolConfig& pool_config, int64_t cluster_size);

  /// Returns a description of how the maximum number of requests that can run in the pool
  /// is configured.
  static std::string GetMaxRequestsForPoolDescription(
      const TPoolConfig& pool_config, int64_t cluster_size);

  /// Returns the effective queue timeout for the pool in milliseconds.
  static int64_t GetQueueTimeoutForPoolMs(const TPoolConfig& pool_config);

  /// Returns a maximum number of queries that should be dequeued locally from 'queue'
  /// before DequeueLoop waits on dequeue_cv_ at the top of its loop.
  /// If it can be determined that no queries can currently be run, then zero
  /// is returned.
  /// Uses a heuristic to limit the number of requests we dequeue locally to avoid all
  /// impalads dequeuing too many requests at the same time.
  int64_t GetMaxToDequeue(RequestQueue& queue, PoolStats* stats,
      const TPoolConfig& pool_config, int64_t cluster_size);

  /// Returns true if the pool has been disabled through configuration.
  static bool PoolDisabled(const TPoolConfig& pool_config);

  /// Returns true if the pool is configured to limit the number of running queries.
  static bool PoolLimitsRunningQueriesCount(const TPoolConfig& pool_config);

  /// Returns true if the pool has a fixed (i.e. not scalable) maximum memory limit.
  static bool PoolHasFixedMemoryLimit(const TPoolConfig& pool_config);

  /// Returns the maximum number of requests that can be queued in the pool.
  static int64_t GetMaxQueuedForPool(
      const TPoolConfig& pool_config, int64_t cluster_size);

  /// Returns a description of how the maximum number of requests that can run be queued
  /// in the pool is configured.
  static std::string GetMaxQueuedForPoolDescription(
      const TPoolConfig& pool_config, int64_t cluster_size);

  /// Return all executor groups from 'all_groups' that can be used to run queries in
  /// 'pool_name'.
  void GetExecutorGroupsForPool(const ClusterMembershipMgr::ExecutorGroups& all_groups,
      const std::string& pool_name, std::vector<const ExecutorGroup*>* matching_groups);

  /// Returns the current size of the cluster.
  int64_t GetClusterSize(const ClusterMembershipMgr::Snapshot& membership_snapshot);

  /// Returns the size of executor group 'group_name' in 'membership_snapshot'.
  int64_t GetExecutorGroupSize(const ClusterMembershipMgr::Snapshot& membership_snapshot,
      const std::string& group_name);

  /// Get the amount of memory to admit for the Backend with the given BackendExecParams.
  /// This method may return different values depending on if the Backend is an Executor
  /// or a Coordinator.
  static int64_t GetMemToAdmit(
      const QuerySchedule& schedule, const BackendExecParams& backend_exec_params);

  /// Updates the list of executor groups for which we maintain the query load metrics.
  /// Removes the metrics of the groups that no longer exist from the metric group and
  /// adds new ones for the newly added groups.
  void UpdateExecGroupMetricMap(ClusterMembershipMgr::SnapshotPtr snapshot);

  /// Updates the num queries executing metric of the 'grp_name' executor group by
  /// 'delta'. Only updates it if the metric exists ('grp_name' has non-zero executors).
  /// Caller must hold 'admission_ctrl_lock_'. Must be called whenever a query is
  /// admitted or released.
  void UpdateExecGroupMetric(const string& grp_name, int64_t delta);

  FRIEND_TEST(AdmissionControllerTest, Simple);
  FRIEND_TEST(AdmissionControllerTest, PoolStats);
  FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
  FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
  FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlots);
  FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
  FRIEND_TEST(AdmissionControllerTest, QueryRejection);
  FRIEND_TEST(AdmissionControllerTest, DedicatedCoordQuerySchedule);
  FRIEND_TEST(AdmissionControllerTest, DedicatedCoordAdmissionChecks);
  friend class AdmissionControllerTest;
};

} // namespace impala

#endif // SCHEDULING_ADMISSION_CONTROLLER_H
