// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef IMPALA_SERVICE_IMPALA_SERVER_H
#define IMPALA_SERVICE_IMPALA_SERVER_H

#include <atomic>
#include <boost/random/random_device.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_set.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <unordered_map>

#include "common/status.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/ImpalaHiveServer2Service.h"
#include "gen-cpp/ImpalaInternalService.h"
#include "gen-cpp/ImpalaService.h"
#include "kudu/util/random.h"
#include "rpc/thrift-server.h"
#include "runtime/timestamp-value.h"
#include "runtime/types.h"
#include "scheduling/query-schedule.h"
#include "service/client-request-state-map.h"
#include "service/query-options.h"
#include "statestore/statestore-subscriber.h"
#include "util/condition-variable.h"
#include "util/container-util.h"
#include "util/runtime-profile.h"
#include "util/simple-logger.h"
#include "util/thread-pool.h"
#include "util/time.h"

namespace impala {
using kudu::ThreadSafeRandom;

class ExecEnv;
class DataSink;
class CancellationWork;
class ImpalaHttpHandler;
class RowDescriptor;
class TCatalogUpdate;
class TPlanExecRequest;
class TPlanExecParams;
class TDmlResult;
class TReportExecStatusArgs;
class TReportExecStatusResult;
class TNetworkAddress;
class TClientRequest;
class TExecRequest;
class TSessionState;
class TQueryOptions;
class TGetExecSummaryResp;
class TGetExecSummaryReq;
class ClientRequestState;
class QuerySchedule;

/// An ImpalaServer contains both frontend and backend functionality;
/// it implements ImpalaService (Beeswax), ImpalaHiveServer2Service (HiveServer2)
/// and ImpalaInternalService APIs.
/// ImpalaServer can be started in 1 of 3 roles: executor, coordinator, or both executor
/// and coordinator. All roles start ImpalaInternalService API's. The
/// coordinator role additionally starts client API's (Beeswax and HiveServer2).
///
/// Startup Sequence
/// ----------------
/// The startup sequence opens and starts all services so that they are ready to be used
/// by clients at the same time. The Impala server is considered 'ready' only when it can
/// process requests with all of its specified roles. Avoiding states where some roles are
/// ready and some are not makes it easier to reason about the state of the server.
///
/// Main thread (caller code), after instantiating the server, must call Start().
/// Start() does the following:
///    - Registers the ImpalaServer instance with the ExecEnv. This also registers it with
///      the ClusterMembershipMgr, which will register it with the statestore as soon as
///      the local backend becomes available through GetLocalBackendDescriptor(), which is
///      in turn gated by the services_started_.
///    - Start internal services
///    - Wait (indefinitely) for local catalog to be initialized from statestore
///      (if coordinator)
///    - Open ImpalaInternalService ports
///    - Open client ports (if coordinator)
///    - Start ImpalaInternalService API
///    - Start client service API's (if coordinator)
///    - Set services_started_ flag
///
///
/// Shutdown
/// --------
/// Impala Server shutdown can be initiated by a remote shutdown command from another
/// Impala daemon or by a local shutdown command from a user session. The shutdown
/// sequence aims to quiesce the Impalad (i.e. drain it of any running finstances or
/// client requests) then exit the process cleanly. The shutdown sequence is as follows:
///
/// 1. StartShutdown() is called to initiate the process.
/// 2. The startup grace period starts, during which:
///   - no new client requests are accepted. Clients can still interact with registered
///     requests and sessions as normal.
///   - the local backend of the Impala daemon is marked in the statestore as quiescing,
///     so coordinators will not schedule new fragments on it (once the statestore update
///     propagates through the ClusterMembershipMgr).
///   - the Impala daemon continues to start executing any new fragments sent to it by
///     coordinators. This is required because the query may have been submitted before
///     the coordinator learned that the executor was quiescing. Delays occur for several
///     reasons:
///     -> Latency of membership propagation through the statestore
///     -> Latency of query startup work including scheduling, admission control and
///        fragment startup.
///     -> Queuing delay in the admission controller (which may be unbounded).
/// 3. The startup grace period elapses.
/// 4. The background shutdown thread periodically checks to see if the Impala daemon is
///    quiesced (i.e. no client requests are registered and no queries are executing on
///    the backend). If it is quiesced then it cleanly shuts down by exiting the process.
///    The statestore will detect that the process is not responding to heartbeats and
///    remove any entries.
/// 5. The shutdown deadline elapses. The Impala daemon exits regardless of whether
///    it was successfully quiesced or not.
///
/// If shutdown is initiated again during this process, it does not cancel the existing
/// shutdown but can decrease the deadline, e.g. if an administrator starts shutdown
/// with a deadline of 1 hour, but then wants to shut down the cluster sooner, they can
/// run the shutdown function again to set a shorter deadline. The deadline can't be
/// increased after shutdown is started.
///
/// Secrets
/// -------
/// The HS2 protocol has a concept of a 'secret' associated with each session and
/// client request that is returned to the client, then must be passed back along with
/// each RPC that interacts with the session or operation. RPCs with non-matching secrets
/// should fail. Beeswax does not have this concept - rather sessions are bound to a
/// connection, and that session and associated requests should (generally) only be
/// accessed from the session's connection.
///
/// All RPC handler functions are responsible for validating secrets when looking up
/// client sessions or requests, with some exceptions:
/// * Beewax sessions are tied to a connection, so it is always valid to access the
///   session from the original connection.
/// * Cancel() and close() Beeswax operations can cancel any query if the ID is known.
///   Existing tools (impala-shell and administrative tools) depend on this behaviour.
///
/// HS2 RPC handlers should pass the client-provided secret to WithSession() before
/// taking any action on behalf of the client. Beeswax RPC handlers should call
/// CheckClientRequestSession() to ensure that the client request is accessible from
/// the current session. All other functions assume that the validation was already done.
///
/// HTTP handlers do not have access to client secrets, so do not validate them.
///
/// Locking
/// -------
/// This class is partially thread-safe. To ensure freedom from deadlock, if multiple
/// locks are acquired, lower-numbered locks must be acquired before higher-numbered
/// locks:
/// 1. session_state_map_lock_
/// 2. SessionState::lock
/// 3. query_expiration_lock_
/// 4. ClientRequestState::fetch_rows_lock
/// 5. ClientRequestState::lock
/// 6. ClientRequestState::expiration_data_lock_
/// 7. Coordinator::exec_summary_lock
///
/// The following locks are not held in conjunction with other locks:
/// * query_log_lock_
/// * session_timeout_lock_
/// * query_locations_lock_
/// * uuid_lock_
/// * catalog_version_lock_
/// * connection_to_sessions_map_lock_
///
/// TODO: The same doesn't apply to the execution state of an individual plan
/// fragment: the originating coordinator might die, but we can get notified of
/// that via the statestore. This still needs to be implemented.
class ImpalaServer : public ImpalaServiceIf,
                     public ImpalaHiveServer2ServiceIf,
                     public ThriftServer::ConnectionHandlerIf,
                     public boost::enable_shared_from_this<ImpalaServer>,
                     public CacheLineAligned {
 public:

  ImpalaServer(ExecEnv* exec_env);
  ~ImpalaServer();

  /// Initializes and starts RPC services and other subsystems (like audit logging).
  /// Returns an error if starting any services failed.
  ///
  /// Different port values have special behaviour. A port > 0 explicitly specifies
  /// the port the server run on. A port value of 0 means to choose an arbitrary
  /// ephemeral port in tests and to not start the service in a daemon. A port < 0
  /// always means to not start the service. The port values can be obtained after
  /// Start() by calling GetThriftBackendPort(), GetBeeswaxPort() or GetHS2Port().
  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
      int32_t hs2_http_port);

  /// Blocks until the server shuts down.
  void Join();

  /// ImpalaService rpcs: Beeswax API (implemented in impala-beeswax-server.cc)
  virtual void query(beeswax::QueryHandle& query_handle, const beeswax::Query& query);
  virtual void executeAndWait(beeswax::QueryHandle& query_handle,
      const beeswax::Query& query, const beeswax::LogContextId& client_ctx);
  virtual void explain(beeswax::QueryExplanation& query_explanation,
      const beeswax::Query& query);
  virtual void fetch(beeswax::Results& query_results,
      const beeswax::QueryHandle& query_handle, const bool start_over,
      const int32_t fetch_size);
  virtual void get_results_metadata(beeswax::ResultsMetadata& results_metadata,
      const beeswax::QueryHandle& handle);
  virtual void close(const beeswax::QueryHandle& handle);
  virtual beeswax::QueryState::type get_state(const beeswax::QueryHandle& handle);
  virtual void echo(std::string& echo_string, const std::string& input_string);
  virtual void clean(const beeswax::LogContextId& log_context);
  virtual void get_log(std::string& log, const beeswax::LogContextId& context);

  /// Return ImpalaQueryOptions default values and "support_start_over/false" to indicate
  /// that Impala does not support start over in the fetch call. Hue relies on this not to
  /// issue a "start_over" fetch call.
  /// "include_hadoop" is not applicable.
  virtual void get_default_configuration(
      std::vector<beeswax::ConfigVariable>& configurations, const bool include_hadoop);

  /// ImpalaService rpcs: unimplemented parts of Beeswax API.
  /// These APIs will not be implemented because ODBC driver does not use them.
  virtual void dump_config(std::string& config);

  /// ImpalaService rpcs: extensions over Beeswax (implemented in
  /// impala-beeswax-server.cc)
  virtual void Cancel(impala::TStatus& status, const beeswax::QueryHandle& query_id);
  virtual void CloseInsert(impala::TDmlResult& dml_result,
      const beeswax::QueryHandle& query_handle);

  /// Pings the Impala service and gets the server version string.
  virtual void PingImpalaService(TPingImpalaServiceResp& return_val);

  virtual void GetRuntimeProfile(std::string& profile_output,
      const beeswax::QueryHandle& query_id);

  virtual void GetExecSummary(impala::TExecSummary& result,
      const beeswax::QueryHandle& query_id);

  /// Performs a full catalog metadata reset, invalidating all table and database
  /// metadata.
  virtual void ResetCatalog(impala::TStatus& status);

  /// Resets the specified table's catalog metadata, forcing a reload on the next access.
  /// Returns an error if the table or database was not found in the catalog.
  virtual void ResetTable(impala::TStatus& status, const TResetTableReq& request);

  /// ImpalaHiveServer2Service rpcs: HiveServer2 API (implemented in impala-hs2-server.cc)
  /// TODO: Migrate existing extra ImpalaServer RPCs to ImpalaHiveServer2Service.
  virtual void OpenSession(
      apache::hive::service::cli::thrift::TOpenSessionResp& return_val,
      const apache::hive::service::cli::thrift::TOpenSessionReq& request);
  virtual void CloseSession(
      apache::hive::service::cli::thrift::TCloseSessionResp& return_val,
      const apache::hive::service::cli::thrift::TCloseSessionReq& request);
  virtual void GetInfo(
      apache::hive::service::cli::thrift::TGetInfoResp& return_val,
      const apache::hive::service::cli::thrift::TGetInfoReq& request);
  virtual void ExecuteStatement(
      apache::hive::service::cli::thrift::TExecuteStatementResp& return_val,
      const apache::hive::service::cli::thrift::TExecuteStatementReq& request);
  virtual void GetTypeInfo(
      apache::hive::service::cli::thrift::TGetTypeInfoResp& return_val,
      const apache::hive::service::cli::thrift::TGetTypeInfoReq& request);
  virtual void GetCatalogs(
      apache::hive::service::cli::thrift::TGetCatalogsResp& return_val,
      const apache::hive::service::cli::thrift::TGetCatalogsReq& request);
  virtual void GetSchemas(
      apache::hive::service::cli::thrift::TGetSchemasResp& return_val,
      const apache::hive::service::cli::thrift::TGetSchemasReq& request);
  virtual void GetTables(
      apache::hive::service::cli::thrift::TGetTablesResp& return_val,
      const apache::hive::service::cli::thrift::TGetTablesReq& request);
  virtual void GetTableTypes(
      apache::hive::service::cli::thrift::TGetTableTypesResp& return_val,
      const apache::hive::service::cli::thrift::TGetTableTypesReq& request);
  virtual void GetColumns(
      apache::hive::service::cli::thrift::TGetColumnsResp& return_val,
      const apache::hive::service::cli::thrift::TGetColumnsReq& request);
  virtual void GetFunctions(
      apache::hive::service::cli::thrift::TGetFunctionsResp& return_val,
      const apache::hive::service::cli::thrift::TGetFunctionsReq& request);
  virtual void GetPrimaryKeys(
      apache::hive::service::cli::thrift::TGetPrimaryKeysResp& return_val,
      const apache::hive::service::cli::thrift::TGetPrimaryKeysReq& request);
  virtual void GetCrossReference(
      apache::hive::service::cli::thrift::TGetCrossReferenceResp& return_val,
      const apache::hive::service::cli::thrift::TGetCrossReferenceReq& request);
  virtual void GetOperationStatus(
      apache::hive::service::cli::thrift::TGetOperationStatusResp& return_val,
      const apache::hive::service::cli::thrift::TGetOperationStatusReq& request);
  virtual void CancelOperation(
      apache::hive::service::cli::thrift::TCancelOperationResp& return_val,
      const apache::hive::service::cli::thrift::TCancelOperationReq& request);
  virtual void CloseOperation(
      apache::hive::service::cli::thrift::TCloseOperationResp& return_val,
      const apache::hive::service::cli::thrift::TCloseOperationReq& request);
  virtual void GetResultSetMetadata(
      apache::hive::service::cli::thrift::TGetResultSetMetadataResp& return_val,
      const apache::hive::service::cli::thrift::TGetResultSetMetadataReq& request);
  virtual void FetchResults(
      apache::hive::service::cli::thrift::TFetchResultsResp& return_val,
      const apache::hive::service::cli::thrift::TFetchResultsReq& request);
  virtual void GetLog(apache::hive::service::cli::thrift::TGetLogResp& return_val,
      const apache::hive::service::cli::thrift::TGetLogReq& request);
  virtual void GetExecSummary(TGetExecSummaryResp& return_val,
      const TGetExecSummaryReq& request);
  virtual void GetRuntimeProfile(TGetRuntimeProfileResp& return_val,
      const TGetRuntimeProfileReq& request);
  virtual void GetDelegationToken(
      apache::hive::service::cli::thrift::TGetDelegationTokenResp& return_val,
      const apache::hive::service::cli::thrift::TGetDelegationTokenReq& req);
  virtual void CancelDelegationToken(
      apache::hive::service::cli::thrift::TCancelDelegationTokenResp& return_val,
      const apache::hive::service::cli::thrift::TCancelDelegationTokenReq& req);
  virtual void RenewDelegationToken(
      apache::hive::service::cli::thrift::TRenewDelegationTokenResp& return_val,
      const apache::hive::service::cli::thrift::TRenewDelegationTokenReq& req);

  // Extensions to HS2 implemented by ImpalaHiveServer2Service.

  /// Pings the Impala service and gets the server version string.
  virtual void PingImpalaHS2Service(TPingImpalaHS2ServiceResp& return_val,
      const TPingImpalaHS2ServiceReq& req);

  /// Closes an Impala operation and returns additional information about the closed
  /// operation.
  virtual void CloseImpalaOperation(
      TCloseImpalaOperationResp& return_val, const TCloseImpalaOperationReq& request);

  /// ImpalaInternalService rpcs
  void UpdateFilter(TUpdateFilterResult& return_val, const TUpdateFilterParams& params);

  /// Generates a unique id for this query and sets it in the given query context.
  /// Prepares the given query context by populating fields required for evaluating
  /// certain expressions, such as now(), pid(), etc. Should be called before handing
  /// the query context to the frontend for query compilation.
  void PrepareQueryContext(TQueryCtx* query_ctx);

  /// Static helper for PrepareQueryContext() that is used from expr-benchmark.
  static void PrepareQueryContext(const TNetworkAddress& backend_addr,
      const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx);

  /// SessionHandlerIf methods

  /// Called when a Beeswax or HS2 connection starts. For Beeswax, registers a new
  /// SessionState associated with the new connection. For HS2, this is a no-op (HS2
  /// has an explicit CreateSession RPC).
  virtual void ConnectionStart(const ThriftServer::ConnectionContext& session_context);

  /// Called when a Beeswax or HS2 connection terminates. Unregisters all sessions
  /// associated with the closed connection.
  virtual void ConnectionEnd(const ThriftServer::ConnectionContext& session_context);

  /// Returns true if the connection is considered idle. A connection is considered
  /// idle if all the sessions associated with it have expired due to idle timeout.
  /// Called when a client has been inactive for --idle_client_poll_period_s seconds.
  virtual bool IsIdleConnection(const ThriftServer::ConnectionContext& session_context);

  void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
      std::vector<TTopicDelta>* topic_updates);

  /// Processes a TCatalogUpdateResult returned from the CatalogServer and ensures
  /// the update has been applied to the local impalad's catalog cache. Called from
  /// ClientRequestState after executing any statement that modifies the catalog.
  ///
  /// If TCatalogUpdateResult contains TCatalogObject(s) to add and/or remove, this
  /// function will update the local cache by directly calling UpdateCatalog() with the
  /// TCatalogObject results.
  ///
  /// If TCatalogUpdateResult does not contain any TCatalogObjects and this is
  /// the result of an INVALIDATE METADATA operation, it waits until the minimum
  /// catalog version in the local cache is greater than or equal to the catalog
  /// version specified in TCatalogUpdateResult. If it is not an INVALIDATE
  /// METADATA operation, it waits until the local impalad's catalog cache has
  /// been updated from a statestore heartbeat that includes this catalog
  /// update's version.
  ///
  /// If wait_for_all_subscribers is true, this function also
  /// waits for all other catalog topic subscribers to process this update by checking the
  /// current min_subscriber_topic_version included in each state store heartbeat.
  Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result,
      bool wait_for_all_subscribers) WARN_UNUSED_RESULT;

  /// Wait until the catalog update with version 'catalog_update_version' is
  /// received and applied in the local catalog cache or until the catalog
  /// service id has changed.
  void WaitForCatalogUpdate(const int64_t catalog_update_version,
      const TUniqueId& catalog_service_id);

  /// Wait until the minimum catalog object version in the local cache is
  /// greater than 'min_catalog_update_version' or until the catalog
  /// service id has changed.
  void WaitForMinCatalogUpdate(const int64_t min_catalog_update_version,
      const TUniqueId& catalog_service_id);

  /// Wait until the last applied catalog update has been broadcast to
  /// all coordinators or until the catalog service id has changed.
  void WaitForCatalogUpdateTopicPropagation(const TUniqueId& catalog_service_id);

  /// Returns true if lineage logging is enabled, false otherwise.
  ///
  /// DEPRECATED: lineage file logging has been deprecated in favor of
  ///             query execution hooks (FE)
  bool IsLineageLoggingEnabled();

  /// Returns true if query execution (FE) hooks are enabled, false otherwise.
  bool AreQueryHooksEnabled();

  /// Returns true if audit event logging is enabled, false otherwise.
  bool IsAuditEventLoggingEnabled();

  /// Retuns true if this is a coordinator, false otherwise.
  bool IsCoordinator();

  /// Returns true if this is an executor, false otherwise.
  bool IsExecutor();

  /// Returns whether this backend is healthy, i.e. able to accept queries.
  bool IsHealthy();

  /// Returns the port that the thrift backend server is listening on. Valid to call after
  /// the server has started successfully.
  int GetThriftBackendPort();

  /// Returns the network address that the thrift backend server is listening on. Valid
  /// to call after the server has started successfully.
  TNetworkAddress GetThriftBackendAddress();

  /// Returns the port that the Beeswax server is listening on. Valid to call after
  /// the server has started successfully.
  int GetBeeswaxPort();

  /// Returns the port that the Beeswax server is listening on. Valid to call after
  /// the server has started successfully.
  int GetHS2Port();

  /// Returns a current snapshot of the local backend descriptor.
  std::shared_ptr<const TBackendDescriptor> GetLocalBackendDescriptor();

  /// Adds the query_id to the map from backend to the list of queries running or expected
  /// to run there (query_locations_). After calling this function, the server will cancel
  /// a query with an error if one of its backends fail.
  void RegisterQueryLocations(
      const PerBackendExecParams& per_backend_params, const TUniqueId& query_id);

  /// Takes a set of network addresses of active backends and cancels all the queries
  /// running on failed ones (that is, addresses not in the active set).
  void CancelQueriesOnFailedBackends(
      const std::unordered_set<TNetworkAddress>& current_membership);

  /// Start the shutdown process. Return an error if it could not be started. Otherwise,
  /// if it was successfully started by this or a previous call, return OK along with
  /// information about the pending shutdown in 'shutdown_status'. 'relative_deadline_s'
  /// is the deadline value in seconds to use, or -1 if we should use the default
  /// deadline. See Shutdown class comment for explanation of the shutdown sequence.
  Status StartShutdown(int64_t relative_deadline_s, ShutdownStatusPB* shutdown_status);

  /// Returns true if a shut down is in progress.
  bool IsShuttingDown() const { return shutting_down_.Load() != 0; }

  /// Returns an informational error about why a new operation could not be started
  /// if the server is shutting down. Must be called before starting execution of a
  /// new operation (e.g. a query).
  Status CheckNotShuttingDown() const;

  /// Return information about the status of a shutdown. Only valid to call if a shutdown
  /// is in progress (i.e. IsShuttingDown() is true).
  ShutdownStatusPB GetShutdownStatus() const;

  /// Convert the shutdown status to a human-readable string.
  static std::string ShutdownStatusToString(const ShutdownStatusPB& shutdown_status);

  /// Appends the audit_entry to audit_event_logger_.
  Status AppendAuditEntry(const std::string& audit_entry);

  /// Appends the lineage_entry to lineage_logger_.
  Status AppendLineageEntry(const std::string& lineage_entry);

  // Mapping between query option names and levels
  QueryOptionLevels query_option_levels_;

  /// The prefix of audit event log filename.
  static const string AUDIT_EVENT_LOG_FILE_PREFIX;

  /// The default executor group name for executors that do not explicitly belong to a
  /// specific executor group.
  static const string DEFAULT_EXECUTOR_GROUP_NAME;

  /// Per-session state.  This object is reference counted using shared_ptrs.  There
  /// is one ref count in the SessionStateMap for as long as the session is active.
  /// All queries running from this session also have a reference.
  struct SessionState {
    /// The default hs2_version must be V1 so that child queries (which use HS2, but may
    /// run as children of Beeswax sessions) get results back in the expected format -
    /// child queries inherit the HS2 version from their parents, and a Beeswax session
    /// will never update the HS2 version from the default.
    SessionState(ImpalaServer* impala_server, TUniqueId session_id, TUniqueId secret)
      : impala_server(impala_server), session_id(std::move(session_id)),
        secret(std::move(secret)),
        closed(false), expired(false), hs2_version(apache::hive::service::cli::thrift::
        TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1), total_queries(0), ref_count(0) {
      DCHECK(this->impala_server != nullptr);
    }

    /// Pointer to the Impala server of this session
    ImpalaServer* impala_server;

    /// The unique session ID. This is also the key for session_state_map_, but
    /// we redundantly store it here for convenience.
    const TUniqueId session_id;

    /// The session secret that client RPCs must pass back in to access the session.
    /// This must not be printed to logs or exposed in any other way.
    const TUniqueId secret;

    TSessionType::type session_type;

    /// Time the session was created, in ms since epoch (UTC).
    int64_t start_time_ms;

    /// Connected user for this session, i.e. the user which originated this session.
    std::string connected_user;

    /// The user to delegate to. Empty for no delegation.
    std::string do_as_user;

    /// Client network address.
    TNetworkAddress network_address;

    /// Protects all fields below. See "Locking" in the class comment for lock
    /// acquisition order.
    boost::mutex lock;

    /// If true, the session has been closed.
    bool closed;

    /// If true, the session was idle for too long and has been expired. Only set when
    /// ref_count == 0, after which point ref_count should never become non-zero (since
    /// clients will use ScopedSessionState to access the session, which will prevent use
    /// after expiration).
    bool expired;

    /// The default database (changed as a result of 'use' query execution).
    std::string database;

    /// Reference to the ImpalaServer's query options
    TQueryOptions* server_default_query_options;

    /// Query options that have been explicitly set in this session.
    TQueryOptions set_query_options;

    /// BitSet indicating which query options in set_query_options have been
    /// explicitly set in the session. Updated when a query option is specified using a
    /// SET command: the bit corresponding to the TImpalaQueryOptions enum is set.
    /// If the option is subsequently reset via a SET with an empty value, the bit
    /// is cleared.
    QueryOptionsMask set_query_options_mask;

    /// For HS2 only, the protocol version this session is expecting.
    apache::hive::service::cli::thrift::TProtocolVersion::type hs2_version;

    /// Inflight queries belonging to this session. It represents the set of queries that
    /// are either queued or have started executing. Used primarily to identify queries
    /// that need to be closed if the session closes or expires.
    boost::unordered_set<TUniqueId> inflight_queries;

    /// Total number of queries run as part of this session.
    int64_t total_queries;

    /// Time the session was last accessed, in ms since epoch (UTC).
    int64_t last_accessed_ms;

    /// If this session has no open connections, this is the time in UTC when the last
    /// connection was closed.
    int64_t disconnected_ms;

    /// The latest Kudu timestamp observed after DML operations executed within this
    /// session.
    uint64_t kudu_latest_observed_ts;

    /// Number of RPCs concurrently accessing this session state. Used to detect when a
    /// session may be correctly expired after a timeout (when ref_count == 0). Typically
    /// at most one RPC will be issued against a session at a time, but clients may do
    /// something unexpected and, for example, poll in one thread and fetch in another.
    uint32_t ref_count;

    /// Per-session idle timeout in seconds. Default value is FLAGS_idle_session_timeout.
    /// It can be overridden with the query option "idle_session_timeout" when opening a
    /// HS2 session, or using the SET command.
    int32_t session_timeout = 0;

    /// The connection ids of any connections that this session has been used over.
    std::set<TUniqueId> connections;

    /// Updates the session timeout based on the query option idle_session_timeout.
    /// It registers/unregisters the session timeout to the Impala server.
    /// The lock must be owned by the caller of this function.
    void UpdateTimeout();

    /// Builds a Thrift representation of this SessionState for serialisation to
    /// the frontend.
    void ToThrift(const TUniqueId& session_id, TSessionState* session_state);

    /// Builds the overlay of the default server query options and the options
    /// explicitly set in this session.
    TQueryOptions QueryOptions();
  };

 private:
  struct ExpirationEvent;
  class SecretArg;
  friend class ChildQuery;
  friend class ControlService;
  friend class ImpalaHttpHandler;
  friend struct SessionState;
  friend class ImpalaServerTest;

  boost::scoped_ptr<ImpalaHttpHandler> http_handler_;

  /// Relevant ODBC SQL State code; for more info,
  /// goto http://msdn.microsoft.com/en-us/library/ms714687.aspx
  static const char* SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION;
  static const char* SQLSTATE_GENERAL_ERROR;
  static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED;

  /// Return exec state for given query_id, or NULL if not found.
  std::shared_ptr<ClientRequestState> GetClientRequestState(
      const TUniqueId& query_id);

  /// Used in situations where the client provides a session ID and a query ID and the
  /// caller needs to validate that the query can be accessed from the session. The two
  /// arguments are the session obtained by looking up the session ID provided by the
  /// RPC client and the effective user from the query. If the username doesn't match,
  /// return an error indicating that the query was not found (since we prefer not to
  /// leak information about the existence of queries that the user doesn't have
  /// access to).
  Status CheckClientRequestSession(SessionState* session,
      const std::string& client_request_effective_user, const TUniqueId& query_id);

  /// Updates a set of Impalad catalog metrics including number tables/databases and
  /// some cache metrics applicable to local catalog mode (if configured). Called at
  /// the end of statestore update callback (to account for metadata object changes)
  /// and at the end of query planning to update local catalog cache access metrics.
  Status UpdateCatalogMetrics() WARN_UNUSED_RESULT;

  /// Depending on the query type, this either submits the query to the admission
  /// controller for performing async admission control or starts asynchronous execution
  /// of query. Creates ClientRequestState (returned in exec_state), registers it and
  /// calls ClientRequestState::Execute(). If it returns with an error status, exec_state
  /// will be NULL and nothing will have been registered in client_request_state_map_.
  /// session_state is a ptr to the session running this query and must have been checked
  /// out.
  Status Execute(TQueryCtx* query_ctx, std::shared_ptr<SessionState> session_state,
      std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;

  /// Implements Execute() logic, but doesn't unregister query on error.
  Status ExecuteInternal(const TQueryCtx& query_ctx,
      std::shared_ptr<SessionState> session_state, bool* registered_exec_state,
      std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;

  /// Registers the query exec state with client_request_state_map_ using the
  /// globally unique query_id.
  /// The caller must have checked out the session state.
  Status RegisterQuery(std::shared_ptr<SessionState> session_state,
      const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;

  /// Adds the query to the set of in-flight queries for the session. The query remains
  /// in-flight until the query is unregistered.  Until a query is in-flight, an attempt
  /// to cancel or close the query by the user will return an error status.  If the
  /// session is closed before a query is in-flight, then the query cancellation is
  /// deferred until after the issuing path has completed initializing the query.  Once
  /// a query is in-flight, it can be cancelled/closed asynchronously by the user
  /// (e.g. via an RPC) and the session close path can close (cancel and unregister) it.
  /// The query must have already been registered using RegisterQuery().  The caller
  /// must have checked out the session state.
  Status SetQueryInflight(std::shared_ptr<SessionState> session_state,
      const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;

  /// Unregister the query by cancelling it, removing exec_state from
  /// client_request_state_map_, and removing the query id from session state's
  /// in-flight query list. If check_inflight is true, then return an error if the query
  /// is not yet in-flight. Otherwise, proceed even if the query isn't yet in-flight (for
  /// cleaning up after an error on the query issuing path).
  Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
      const Status* cause = NULL) WARN_UNUSED_RESULT;

  /// Performs any final cleanup necessary before the given ClientRequestState goes out
  /// of scope and is deleted. Marks the given ClientRequestState as done, removes the
  /// query from the inflight queries list, updates query_locations_, and archives the
  /// query. Used when unregistering the query.
  Status CloseClientRequestState(
      const std::shared_ptr<ClientRequestState>& request_state);

  /// Initiates query cancellation reporting the given cause as the query status.
  /// Assumes deliberate cancellation by the user if the cause is NULL.  Returns an
  /// error if query_id is not found.  If check_inflight is true, then return an error
  /// if the query is not yet in-flight.  Otherwise, returns OK.  Queries still need to
  /// be unregistered, after cancellation.  Caller should not hold any locks when
  /// calling this function.
  Status CancelInternal(const TUniqueId& query_id, bool check_inflight,
      const Status* cause = NULL) WARN_UNUSED_RESULT;

  /// Close the session and release all resource used by this session.
  /// Caller should not hold any locks when calling this function.
  /// If ignore_if_absent is true, returns OK even if a session with the supplied ID does
  /// not exist.
  Status CloseSessionInternal(const TUniqueId& session_id, const SecretArg& secret,
      bool ignore_if_absent) WARN_UNUSED_RESULT;

  /// Gets the runtime profile string for a given query_id and writes it to the output
  /// stream. First searches for the query id in the map of in-flight queries. If no
  /// match is found there, the query log is searched. Returns OK if the profile was
  /// found, otherwise a Status object with an error message will be returned. The
  /// output stream will not be modified on error.
  /// On success, if 'format' is BASE64 or STRING then 'output' will be set, or if
  /// 'format' is THRIFT then 'thrift_output' will be set. If 'format' is JSON
  /// then 'json_output' will be set.
  /// If the user asking for this profile is the same user that runs the query
  /// and that user has access to the runtime profile, the profile is written to
  /// the output. Otherwise, nothing is written to output and an error code is
  /// returned to indicate an authorization error.
  Status GetRuntimeProfileOutput(const TUniqueId& query_id, const std::string& user,
      TRuntimeProfileFormat::type format, std::stringstream* output,
      TRuntimeProfileTree* thrift_output,
      rapidjson::Document* json_output) WARN_UNUSED_RESULT;

  /// Returns the exec summary for this query if the user asking for the exec
  /// summary is the same user that run the query and that user has access to the full
  /// query profile. Otherwise, an error status is returned to indicate an
  /// authorization error.
  Status GetExecSummary(const TUniqueId& query_id, const std::string& user,
      TExecSummary* result) WARN_UNUSED_RESULT;

  /// Collect ExecSummary and update it to the profile in request_state
  void UpdateExecSummary(std::shared_ptr<ClientRequestState> request_state) const;

  /// Initialize "default_configs_" to show the default values for ImpalaQueryOptions and
  /// "support_start_over/false" to indicate that Impala does not support start over
  /// in the fetch call.
  void InitializeConfigVariables();

  /// Sets the option level for parameter 'option' based on the mapping stored in
  /// 'query_option_levels_'. The option level is used by the Impala shell when it
  /// displays the options. 'option_key' is the key for the 'query_option_levels_'
  /// to get the level of the query option.
  void AddOptionLevelToConfig(beeswax::ConfigVariable* option,
      const string& option_key) const;

  /// Checks settings for profile logging, including whether the output
  /// directory exists and is writeable, and initialises the first log file.
  /// Returns OK unless there is some problem preventing profile log files
  /// from being written. If an error is returned, the constructor will disable
  /// profile logging.
  Status InitProfileLogging() WARN_UNUSED_RESULT;

  /// Checks settings for audit event logging, including whether the output
  /// directory exists and is writeable, and initialises the first log file.
  /// Returns OK unless there is some problem preventing audit event log files
  /// from being written. If an error is returned, impalad startup will be aborted.
  Status InitAuditEventLogging() WARN_UNUSED_RESULT;

  /// Checks settings for lineage logging, including whether the output
  /// directory exists and is writeable, and initialises the first log file.
  /// Returns OK unless there is some problem preventing lineage log files
  /// from being written. If an error is returned, impalad startup will be aborted.
  Status InitLineageLogging() WARN_UNUSED_RESULT;

  /// Initializes a logging directory, creating the directory if it does not already
  /// exist. If there is any error creating the directory an error will be returned.
  static Status InitLoggingDir(const std::string& log_dir) WARN_UNUSED_RESULT;

  /// Runs once every 5s to flush the profile log file to disk.
  [[noreturn]] void LogFileFlushThread();

  /// Runs once every 5s to flush the audit log file to disk.
  [[noreturn]] void AuditEventLoggerFlushThread();

  /// Runs once every 5s to flush the lineage log file to disk.
  [[noreturn]] void LineageLoggerFlushThread();

  /// Copies a query's state into the query log. Called immediately prior to a
  /// ClientRequestState's deletion. Also writes the query profile to the profile log
  /// on disk.
  void ArchiveQuery(const ClientRequestState& query);

  /// Checks whether the given user is allowed to delegate as the specified do_as_user.
  /// Returns OK if the authorization suceeds, otherwise returns an status with details
  /// on why the failure occurred.
  Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user)
      WARN_UNUSED_RESULT;

  /// Initializes the backend descriptor in 'be_desc' with the local backend information.
  void BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_desc);

  /// Snapshot of a query's state, archived in the query log.
  struct QueryStateRecord {
    /// Pretty-printed runtime profile. TODO: Copy actual profile object
    std::string profile_str;

    /// Base64 encoded runtime profile
    std::string encoded_profile_str;

    /// JSON based runtime profile
    std::string json_profile_str;

    /// Query id
    TUniqueId id;

    /// Queries are run and authorized on behalf of the effective_user.
    /// If there is no delegated user, this will be the connected user. Otherwise, it
    /// will be set to the delegated user.
    std::string effective_user;

    /// If true, effective_user has access to the runtime profile and execution
    /// summary.
    bool user_has_profile_access;

    /// default db for this query
    std::string default_db;

    /// SQL statement text
    std::string stmt;

    /// Text representation of plan
    std::string plan;

    /// DDL, DML etc.
    TStmtType::type stmt_type;

    /// True if the query required a coordinator fragment
    bool has_coord;

    /// The number of fragments that have completed
    int64_t num_complete_fragments;

    /// The total number of fragments
    int64_t total_fragments;

    /// The number of rows fetched by the client
    int64_t num_rows_fetched;

    /// The state of the query as of this snapshot
    beeswax::QueryState::type query_state;

    /// Start and end time of the query, in Unix microseconds.
    /// A query whose end_time_us is 0 indicates that it is an in-flight query.
    /// These two variables are initialized with the corresponding values from
    /// ClientRequestState.
    int64_t start_time_us, end_time_us;

    /// Summary of execution for this query.
    TExecSummary exec_summary;

    Status query_status;

    /// Timeline of important query events
    TEventSequence event_sequence;

    /// Save the query plan fragments so that the plan tree can be rendered on the debug
    /// webpages.
    vector<TPlanFragment> fragments;

    // If true, this query has no more rows to return
    bool all_rows_returned;

    // The most recent time this query was actively being processed, in Unix milliseconds.
    int64_t last_active_time_ms;

    /// Resource pool to which the request was submitted for admission, or an empty
    /// string if this request doesn't go through admission control.
    std::string resource_pool;

    /// Initialise from an exec_state. If copy_profile is true, print the query
    /// profile to a string and copy that into this.profile (which is expensive),
    /// otherwise leave this.profile empty.
    /// If encoded_str is non-empty, it is the base64 encoded string for
    /// exec_state->profile.
    QueryStateRecord(const ClientRequestState& exec_state, bool copy_profile = false,
        const std::string& encoded_str = "");

    /// Default constructor used only when participating in collections
    QueryStateRecord() { }
  };

  struct QueryStateRecordLessThan {
    /// Comparator that sorts by start time.
    bool operator() (const QueryStateRecord& lhs, const QueryStateRecord& rhs) const;
  };

  /// Beeswax private methods

  /// Helper functions to translate between Beeswax and Impala structs
  Status QueryToTQueryContext(const beeswax::Query& query, TQueryCtx* query_ctx)
      WARN_UNUSED_RESULT;
  void TUniqueIdToQueryHandle(const TUniqueId& query_id, beeswax::QueryHandle* handle);
  void QueryHandleToTUniqueId(const beeswax::QueryHandle& handle, TUniqueId* query_id);

  /// Helper function to raise BeeswaxException
  [[noreturn]] void RaiseBeeswaxException(const std::string& msg, const char* sql_state);

  /// Executes the fetch logic. Doesn't clean up the exec state if an error occurs.
  Status FetchInternal(ClientRequestState* request_state, bool start_over,
      int32_t fetch_size, beeswax::Results* query_results) WARN_UNUSED_RESULT;

  /// Populate dml_result and clean up exec state. If the query
  /// status is an error, dml_result is not populated and the status is returned.
  /// 'session' is RPC client's session, used to check whether the DML can
  /// be closed via that session.
  Status CloseInsertInternal(SessionState* session, const TUniqueId& query_id,
      TDmlResult* dml_result) WARN_UNUSED_RESULT;

  /// HiveServer2 private methods (implemented in impala-hs2-server.cc)

  /// Starts the synchronous execution of a HiverServer2 metadata operation.
  /// If the execution succeeds, an ClientRequestState will be created and registered in
  /// client_request_state_map_. Otherwise, nothing will be registered in
  /// client_request_state_map_ and an error status will be returned. As part of this
  /// call, the TMetadataOpRequest struct will be populated with the requesting user's
  /// session state.
  /// Returns a TOperationHandle and TStatus.
  void ExecuteMetadataOp(
      const apache::hive::service::cli::thrift::THandleIdentifier& session_handle,
      TMetadataOpRequest* request,
      apache::hive::service::cli::thrift::TOperationHandle* handle,
      apache::hive::service::cli::thrift::TStatus* status);

  /// Executes the fetch logic for HiveServer2 FetchResults and stores result size in
  /// 'num_results'. If fetch_first is true, then the query's state should be reset to
  /// fetch from the beginning of the result set. Doesn't clean up 'request_state' if an
  /// error occurs.
  Status FetchInternal(ClientRequestState* request_state, SessionState* session,
      int32_t fetch_size, bool fetch_first,
      apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results,
      int32_t* num_results) WARN_UNUSED_RESULT;

  /// Helper functions to translate between HiveServer2 and Impala structs

  /// Returns !ok() if handle.guid.size() or handle.secret.size() != 16
  static Status THandleIdentifierToTUniqueId(
      const apache::hive::service::cli::thrift::THandleIdentifier& handle,
      TUniqueId* unique_id, TUniqueId* secret) WARN_UNUSED_RESULT;
  static void TUniqueIdToTHandleIdentifier(
      const TUniqueId& unique_id, const TUniqueId& secret,
      apache::hive::service::cli::thrift::THandleIdentifier* handle);
  Status TExecuteStatementReqToTQueryContext(
      const apache::hive::service::cli::thrift::TExecuteStatementReq execute_request,
      TQueryCtx* query_ctx) WARN_UNUSED_RESULT;

  /// Helper method to process cancellations that result from failed backends, called from
  /// the cancellation thread pool. The cancellation_work contains the query id to cancel
  /// and a cause listing the failed backends that led to cancellation. Calls
  /// CancelInternal directly, but has a signature compatible with the thread pool.
  void CancelFromThreadPool(uint32_t thread_id,
      const CancellationWork& cancellation_work);

  /// Helper method to add the pool name and query options to the query_ctx. Must be
  /// called before ExecuteInternal() at which point the TQueryCtx is const and cannot
  /// be mutated. override_options_mask indicates which query options can be overridden
  /// by the pool default query options.
  void AddPoolConfiguration(TQueryCtx* query_ctx,
      const QueryOptionsMask& override_options_mask);

  /// Register timeout value upon opening a new session. This will wake up
  /// session_timeout_thread_.
  void RegisterSessionTimeout(int32_t timeout);

  /// Unregister timeout value.
  void UnregisterSessionTimeout(int32_t timeout);

  /// To be run in a thread which wakes up every second if there are registered sesions
  /// timeouts. This function checks all sessions for:
  /// - Last-idle times. Those that have been idle for longer than their configured
  ///   timeout values are 'expired': they will no longer accept queries.
  /// - Disconnected times. Those that have had no active connections for longer than
  ///   FLAGS_disconnected_session_timeout are closed: they are removed from the session
  ///   state map and can no longer be accessed by clients.
  /// For either case any running queries associated with those sessions are unregistered.
  [[noreturn]] void SessionMaintenance();

  /// Runs forever, walking queries_by_timestamp_ and expiring any queries that have been
  /// idle (i.e. no client input and no time spent processing locally) for
  /// FLAGS_idle_query_timeout seconds.
  [[noreturn]] void ExpireQueries();

  /// Periodically iterates over all queries and cancels any where a backend hasn't sent a
  /// status report in greater than GetMaxReportRetryMs().
  [[noreturn]] void UnresponsiveBackendThread();

  /// Called from ExpireQueries() to check query resource limits for 'crs'. If the query
  /// exceeded a resource limit, returns a non-OK status with information about what
  /// limit was exceeded. Returns OK if the query will continue running and expiration
  /// check should be rescheduled for a later time.
  Status CheckResourceLimits(ClientRequestState* crs);

  /// Expire 'crs' and cancel it with status 'status'.
  void ExpireQuery(ClientRequestState* crs, const Status& status);

  typedef boost::unordered_map<std::string, boost::unordered_set<std::string>>
      AuthorizedProxyMap;
  /// Populates authorized proxy config into the given map.
  /// For example:
  /// - authorized_proxy_config: foo=abc,def;bar=ghi
  /// - authorized_proxy_config_delimiter: ,
  /// - authorized_proxy_map: {foo:[abc, def], bar=s[ghi]}
  static Status PopulateAuthorizedProxyConfig(
      const std::string& authorized_proxy_config,
      const std::string& authorized_proxy_config_delimiter,
      AuthorizedProxyMap* authorized_proxy_map);

  /// Background thread that does the shutdown.
  [[noreturn]] void ShutdownThread();

  /// Random number generator for use in this class, thread safe.
  static ThreadSafeRandom rng_;

  /// Guards query_log_ and query_log_index_
  boost::mutex query_log_lock_;

  /// FIFO list of query records, which are written after the query finishes executing
  typedef std::list<QueryStateRecord> QueryLog;
  QueryLog query_log_;

  /// Index that allows lookup via TUniqueId into the query log
  typedef boost::unordered_map<TUniqueId, QueryLog::iterator> QueryLogIndex;
  QueryLogIndex query_log_index_;

  /// Logger for writing encoded query profiles, one per line with the following format:
  /// <ms-since-epoch> <query-id> <thrift query profile URL encoded and gzipped>
  boost::scoped_ptr<SimpleLogger> profile_logger_;

  /// Logger for writing audit events, one per line with the format:
  /// "<current timestamp>" : { JSON object }
  boost::scoped_ptr<SimpleLogger> audit_event_logger_;

  /// Logger for writing lineage events, one per line with the format:
  /// { JSON object }
  boost::scoped_ptr<SimpleLogger> lineage_logger_;

  /// If profile logging is enabled, wakes once every 5s to flush query profiles to disk
  std::unique_ptr<Thread> profile_log_file_flush_thread_;

  /// If audit event logging is enabled, wakes once every 5s to flush audit events to disk
  std::unique_ptr<Thread> audit_event_logger_flush_thread_;

  /// If lineage logging is enabled, wakes once every 5s to flush lineage events to disk
  std::unique_ptr<Thread> lineage_logger_flush_thread_;

  /// global, per-server state
  ExecEnv* exec_env_;  // not owned

  /// Thread pool to process cancellation requests that come from failed Impala demons to
  /// avoid blocking the statestore callback.
  boost::scoped_ptr<ThreadPool<CancellationWork>> cancellation_thread_pool_;

  /// Thread that runs SessionMaintenance. It will wake up periodically to check for
  /// sessions which are idle for more their timeout values.
  std::unique_ptr<Thread> session_maintenance_thread_;

  /// Contains all the non-zero idle or disconnected session timeout values.
  std::multiset<int32_t> session_timeout_set_;

  /// The lock for protecting the session_timeout_set_.
  boost::mutex session_timeout_lock_;

  /// session_timeout_thread_ relies on the following conditional variable to wake up
  /// when there are sessions that have a timeout.
  ConditionVariable session_timeout_cv_;

  /// Thread that runs UnresponsiveBackendThread().
  std::unique_ptr<Thread> unresponsive_backend_thread_;

  /// A ClientRequestStateMap maps query ids to ClientRequestStates. The
  /// ClientRequestStates are owned by the ImpalaServer and ClientRequestStateMap
  /// references them using shared_ptr to allow asynchronous deletion.
  ClientRequestStateMap client_request_state_map_;

  /// Default query options in the form of TQueryOptions and beeswax::ConfigVariable
  TQueryOptions default_query_options_;
  std::vector<beeswax::ConfigVariable> default_configs_;

  // Container for a secret passed into functions for validation.
  class SecretArg {
   public:
    // This should only be used if the client has not provided a secret but the caller
    // knows that the client in fact is allowed to access the session or operation that
    // they are accessing.
    static SecretArg SkipSecretCheck() {
      return SecretArg(true, true, TUniqueId(), TUniqueId());
    }

    /// Pass a session secret for validation.
    static SecretArg Session(const TUniqueId& secret) {
      return SecretArg(false, true, secret, TUniqueId());
    }

    /// Pass a query secret for validation. The error message will include the 'query_id',
    /// so that must also be passed.
    static SecretArg Operation(const TUniqueId& secret, const TUniqueId& query_id) {
      return SecretArg(false, false, secret, query_id);
    }

    /// Return true iff the check should be skipped or the secret matches 'other'.
    /// All validation should be done via this function to avoid subtle errors.
    bool Validate(const TUniqueId& other) const {
      return skip_validation_ || ConstantTimeCompare(other) == 0;
    }

    bool is_session_secret() const { return is_session_secret_; }
    TUniqueId query_id() const {
      DCHECK(!is_session_secret_);
      return query_id_;
    }

   private:
    SecretArg(bool skip_validation, bool is_session_secret, TUniqueId secret,
        TUniqueId query_id)
      : skip_validation_(skip_validation),
        is_session_secret_(is_session_secret),
        secret_(std::move(secret)),
        query_id_(std::move(query_id)) {}

    /// A comparison function for unique IDs that executes in an amount of time unrelated
    /// to the input values. Returns the number of words that differ between id1 and id2.
    int ConstantTimeCompare(const TUniqueId& other) const;

    /// True if the caller wants to skip validation of the session.
    const bool skip_validation_;

    /// True if this is a session secret, false if this is an operation secret.
    const bool is_session_secret_;

    /// The secret to validate.
    const TUniqueId secret_;

    /// The query id, only provided if this is an operation secret.
    const TUniqueId query_id_;
  };

  /// Class that allows users of SessionState to mark a session as in-use, and therefore
  /// immune to expiration. The marking is done in WithSession() and undone in the
  /// destructor, so this class can be used to 'check-out' a session for the duration of a
  /// scope.
  class ScopedSessionState {
   public:
    ScopedSessionState(ImpalaServer* impala) : impala_(impala) { }

    /// Marks a session as in-use, and saves it so that it can be unmarked when this
    /// object goes out of scope. Returns OK unless there is an error in GetSessionState.
    /// 'secret' must be provided and is validated against the stored secret for the
    /// session. Must only be called once per ScopedSessionState.
    Status WithSession(const TUniqueId& session_id, const SecretArg& secret,
        std::shared_ptr<SessionState>* session = NULL) WARN_UNUSED_RESULT {
      DCHECK(session_.get() == NULL);
      RETURN_IF_ERROR(impala_->GetSessionState(
            session_id, secret, &session_, /* mark_active= */ true));
      if (session != NULL) (*session) = session_;

      // We won't have a connection context in the case of ChildQuery, which calls into
      // hiveserver2 functions directly without going through the Thrift stack.
      if (ThriftServer::HasThreadConnectionContext()) {
        // Check that the session user matches the user authenticated on the connection.
        const ThriftServer::Username& connection_username =
            ThriftServer::GetThreadConnectionContext()->username;
        if (!connection_username.empty()
            && session_->connected_user != connection_username) {
          return Status::Expected(TErrorCode::UNAUTHORIZED_SESSION_USER,
              connection_username, session_->connected_user);
        }

        // Try adding the session id to the connection's set of sessions in case this is
        // the first time this session has been used on this connection.
        impala_->AddSessionToConnection(session_id, session_.get());
      }
      return Status::OK();
    }

    /// Same as WithSession(), except:
    /// * It should only be called from beeswax with a 'connection_id' obtained from
    ///   ThriftServer::GetThreadConnectionId().
    /// * It does not update the session/connection mapping, as beeswax sessions can
    ///   only be used over a single connection.
    Status WithBeeswaxSession(const TUniqueId& connection_id,
        std::shared_ptr<SessionState>* session = NULL) {
      DCHECK(session_.get() == NULL);
      RETURN_IF_ERROR(
          impala_->GetSessionState(connection_id, SecretArg::SkipSecretCheck(), &session_,
              /* mark_active= */ true));
      if (session != NULL) (*session) = session_;
      return Status::OK();
    }

    /// Decrements the reference count so the session can be expired correctly.
    ~ScopedSessionState() {
      if (session_.get() != NULL) {
        impala_->MarkSessionInactive(session_);
      }
    }

   private:
    /// Reference-counted pointer to the session state object.
    std::shared_ptr<SessionState> session_;

    /// Saved so that we can access ImpalaServer methods to get / return session state.
    ImpalaServer* impala_;
  };

  /// For access to GetSessionState() / MarkSessionInactive()
  friend class ScopedSessionState;

  /// Protects session_state_map_. See "Locking" in the class comment for lock
  /// acquisition order.
  boost::mutex session_state_map_lock_;

  /// A map from session identifier to a structure containing per-session information
  typedef boost::unordered_map<TUniqueId, std::shared_ptr<SessionState>> SessionStateMap;
  SessionStateMap session_state_map_;

  /// Protects connection_to_sessions_map_. See "Locking" in the class comment for lock
  /// acquisition order.
  boost::mutex connection_to_sessions_map_lock_;

  /// Map from a connection ID to the associated list of sessions so that all can be
  /// closed when the connection ends. HS2 allows for multiplexing several sessions across
  /// a single connection. If a session has already been closed (only possible via HS2) it
  /// is not removed from this map to avoid the cost of looking it up.
  typedef boost::unordered_map<TUniqueId, std::set<TUniqueId>> ConnectionToSessionMap;
  ConnectionToSessionMap connection_to_sessions_map_;

  /// Returns session state for given session_id.
  /// If not found or validation of 'secret' against the stored secret in the
  /// SessionState fails, session_state will be NULL and an error status will be returned.
  /// If mark_active is true, also checks if the session is expired or closed and
  /// increments the session's reference counter if it is still alive.
  Status GetSessionState(const TUniqueId& session_id, const SecretArg& secret,
      std::shared_ptr<SessionState>* session_state, bool mark_active = false)
      WARN_UNUSED_RESULT;

  /// Decrement the session's reference counter and mark last_accessed_ms so that state
  /// expiration can proceed.
  inline void MarkSessionInactive(std::shared_ptr<SessionState> session) {
    boost::lock_guard<boost::mutex> l(session->lock);
    DCHECK_GT(session->ref_count, 0);
    --session->ref_count;
    session->last_accessed_ms = UnixMillis();
  }

  /// Associate the current connection context with the given session in
  /// 'connection_to_sessions_map_' and 'SessionState::connections'.
  void AddSessionToConnection(const TUniqueId& session_id, SessionState* session);

  /// Protects query_locations_. Not held in conjunction with other locks.
  boost::mutex query_locations_lock_;

  /// A map from backend to the list of queries currently running or expected to run
  /// there.
  typedef std::unordered_map<TNetworkAddress, std::unordered_set<TUniqueId>>
      QueryLocations;
  QueryLocations query_locations_;

  /// The local backend descriptor. Updated in GetLocalBackendDescriptor() and protected
  /// by 'local_backend_descriptor_lock_';
  std::shared_ptr<const TBackendDescriptor> local_backend_descriptor_;
  boost::mutex local_backend_descriptor_lock_;

  /// UUID generator for session IDs and secrets. Uses system random device to get
  /// cryptographically secure random numbers.
  boost::uuids::basic_random_generator<boost::random_device> crypto_uuid_generator_;

  /// Lock to protect uuid_generator
  boost::mutex uuid_lock_;

  /// Lock for catalog_update_version_, min_subscriber_catalog_topic_version_,
  /// and catalog_version_update_cv_
  boost::mutex catalog_version_lock_;

  /// Variable to signal when the catalog version has been modified
  ConditionVariable catalog_version_update_cv_;

  /// Contains details on the version information of a catalog update.
  struct CatalogUpdateVersionInfo {
    CatalogUpdateVersionInfo() :
      catalog_version(0L),
      catalog_topic_version(0L),
      catalog_object_version_lower_bound(0L) {
    }
    /// Update the metrics to store the current version of catalog, current topic and
    /// current service id used by impalad.
    void  UpdateCatalogVersionMetrics();
    /// The last catalog version returned from UpdateCatalog()
    int64_t catalog_version;
    /// The CatalogService ID that this catalog version is from.
    TUniqueId catalog_service_id;
    /// The statestore catalog topic version this update was received in.
    int64_t catalog_topic_version;
    /// Lower bound of catalog object versions after a call to UpdateCatalog()
    int64_t catalog_object_version_lower_bound;
  };

  /// The version information from the last successfull call to UpdateCatalog().
  CatalogUpdateVersionInfo catalog_update_info_;

  /// The current minimum topic version processed across all subscribers of the catalog
  /// topic. Used to determine when other nodes have successfully processed a catalog
  /// update. Updated with each catalog topic heartbeat from the statestore.
  int64_t min_subscriber_catalog_topic_version_;

  /// Map of short usernames of authorized proxy users to the set of users they are
  /// allowed to delegate to. Populated by parsing the --authorized_proxy_users_config
  /// flag.
  AuthorizedProxyMap authorized_proxy_user_config_;
  /// Map of short usernames of authorized proxy users to the set of groups they are
  /// allowed to delegate to. Populated by parsing the --authorized_proxy_groups_config
  /// flag.
  AuthorizedProxyMap authorized_proxy_group_config_;

  /// Guards queries_by_timestamp_. See "Locking" in the class comment for lock
  /// acquisition order.
  boost::mutex query_expiration_lock_;

  enum class ExpirationKind {
    // The query is cancelled if the query has been inactive this long. The event may
    // cancel the query after checking the last active time.
    IDLE_TIMEOUT,
    // A hard time limit on query execution. The query is cancelled if this event occurs
    // before the query finishes.
    EXEC_TIME_LIMIT,
    // A hard limit on cpu and scanned bytes. The query is cancelled if this event occurs
    // before the query finishes.
    RESOURCE_LIMIT,
  };

  // Describes a query expiration event where the query identified by 'query_id' is
  // checked for expiration when UnixMillis() exceeds 'deadline'.
  struct ExpirationEvent {
    int64_t deadline;
    TUniqueId query_id;
    ExpirationKind kind;
  };

  /// Comparator that breaks ties when two queries have identical expiration deadlines.
  struct ExpirationEventComparator {
    bool operator()(const ExpirationEvent& t1, const ExpirationEvent& t2) {
      if (t1.deadline < t2.deadline) return true;
      if (t2.deadline < t1.deadline) return false;
      if (t1.query_id < t2.query_id) return true;
      if (t2.query_id < t1.query_id) return false;
      return t1.kind < t2.kind;
    }
  };

  /// Ordered set of (expiration_time, query_id) pairs. This queue is updated either by
  /// RegisterQuery(), which adds a new query to the set, or by ExpireQueries(), which
  /// updates the entries as it iterates over the set. Therefore, it is not directly
  /// accessed during normal query execution and the benefit of that is there is no
  /// competition for locks when ExpireQueries() walks this list to find expired queries.
  //
  /// In order to make the query expiration algorithm work, the following condition always
  /// holds:
  //
  /// * For any pair (t, q) in the set, t is always a lower bound on the true expiration
  /// time of the query q (in q->idle_time()). Therefore we can bound the expiration error
  /// by sleeping to no more than t + d for some small d (in our implementation, d is 1s).
  //
  /// The reason that the expiration time saved in each entry here may not exactly match
  /// the true expiration time of a query is because we wish to avoid accessing (and
  /// therefore locking) this structure on every query activity. Instead, queries maintain
  /// an accurate expiration time, and this structure guarantees that we will always
  /// (modulo scheduling delays out of our control) read the expiration time before it has
  /// passed.
  typedef std::set<ExpirationEvent, ExpirationEventComparator> ExpirationQueue;
  ExpirationQueue queries_by_timestamp_;

  /// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
  std::unique_ptr<Thread> query_expiration_thread_;

  /// True if this ImpalaServer can accept client connections and coordinate
  /// queries.
  bool is_coordinator_;

  /// True if this ImpalaServer can execute query fragments.
  bool is_executor_;

  /// Containers for client and internal services. May not be set if the ports passed to
  /// Init() were <= 0.
  /// Note that these hold a shared pointer to 'this', and so need to be reset()
  /// explicitly.
  boost::scoped_ptr<ThriftServer> beeswax_server_;
  boost::scoped_ptr<ThriftServer> hs2_server_;
  boost::scoped_ptr<ThriftServer> hs2_http_server_;
  boost::scoped_ptr<ThriftServer> thrift_be_server_;

  /// Flag that records if backend and/or client services have been started. The flag is
  /// set after all services required for the server have been started.
  std::atomic_bool services_started_;

  /// Whether the Impala server shutdown process started. If 0, shutdown was not started.
  /// Otherwise, this is the MonotonicMillis() value when the shut down was started.
  AtomicInt64 shutting_down_{0};

  /// The MonotonicMillis() value after we should shut down regardless of registered
  /// client requests and running finstances. Set before 'shutting_down_' and updated
  /// atomically if a new shutdown command with a shorter deadline comes in.
  AtomicInt64 shutdown_deadline_{0};
};
}

#endif
