blob: a05bddcef22aea8932390a28e53f3097706797fd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <atomic>
#include <boost/random/random_device.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_set.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <unordered_map>
#include "common/status.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/ImpalaHiveServer2Service.h"
#include "gen-cpp/ImpalaInternalService.h"
#include "gen-cpp/ImpalaService.h"
#include "kudu/util/random.h"
#include "rpc/thrift-server.h"
#include "runtime/timestamp-value.h"
#include "runtime/types.h"
#include "scheduling/query-schedule.h"
#include "service/query-options.h"
#include "statestore/statestore-subscriber.h"
#include "util/condition-variable.h"
#include "util/container-util.h"
#include "util/runtime-profile.h"
#include "util/sharded-query-map-util.h"
#include "util/simple-logger.h"
#include "util/thread-pool.h"
#include "util/time.h"
namespace impala {
using kudu::ThreadSafeRandom;
class ExecEnv;
class DataSink;
class CancellationWork;
class ImpalaHttpHandler;
class RowDescriptor;
class TCatalogUpdate;
class TPlanExecRequest;
class TPlanExecParams;
class TDmlResult;
class TReportExecStatusArgs;
class TReportExecStatusResult;
class TNetworkAddress;
class TClientRequest;
class TExecRequest;
class TSessionState;
class TQueryOptions;
class TGetExecSummaryResp;
class TGetExecSummaryReq;
class ClientRequestState;
class QuerySchedule;
/// An ImpalaServer contains both frontend and backend functionality;
/// it implements ImpalaService (Beeswax), ImpalaHiveServer2Service (HiveServer2)
/// and ImpalaInternalService APIs.
/// ImpalaServer can be started in 1 of 3 roles: executor, coordinator, or both executor
/// and coordinator. All roles start ImpalaInternalService API's. The
/// coordinator role additionally starts client API's (Beeswax and HiveServer2).
/// Startup Sequence
/// ----------------
/// The startup sequence opens and starts all services so that they are ready to be used
/// by clients at the same time. The Impala server is considered 'ready' only when it can
/// process requests with all of its specified roles. Avoiding states where some roles are
/// ready and some are not makes it easier to reason about the state of the server.
/// Main thread (caller code), after instantiating the server, must call Start().
/// Start() does the following:
/// - Registers the ImpalaServer instance with the ExecEnv. This also registers it with
/// the ClusterMembershipMgr, which will register it with the statestore as soon as
/// the local backend becomes available through GetLocalBackendDescriptor(), which is
/// in turn gated by the services_started_.
/// - Start internal services
/// - Wait (indefinitely) for local catalog to be initialized from statestore
/// (if coordinator)
/// - Open ImpalaInternalService ports
/// - Open client ports (if coordinator)
/// - Start ImpalaInternalService API
/// - Start client service API's (if coordinator)
/// - Set services_started_ flag
/// Shutdown
/// --------
/// Impala Server shutdown can be initiated by a remote shutdown command from another
/// Impala daemon or by a local shutdown command from a user session. The shutdown
/// sequence aims to quiesce the Impalad (i.e. drain it of any running finstances or
/// client requests) then exit the process cleanly. The shutdown sequence is as follows:
/// 1. StartShutdown() is called to initiate the process.
/// 2. The startup grace period starts, during which:
/// - no new client requests are accepted. Clients can still interact with registered
/// requests and sessions as normal.
/// - the local backend of the Impala daemon is marked in the statestore as quiescing,
/// so coordinators will not schedule new fragments on it (once the statestore update
/// propagates through the ClusterMembershipMgr).
/// - the Impala daemon continues to start executing any new fragments sent to it by
/// coordinators. This is required because the query may have been submitted before
/// the coordinator learned that the executor was quiescing. Delays occur for several
/// reasons:
/// -> Latency of membership propagation through the statestore
/// -> Latency of query startup work including scheduling, admission control and
/// fragment startup.
/// -> Queuing delay in the admission controller (which may be unbounded).
/// 3. The startup grace period elapses.
/// 4. The background shutdown thread periodically checks to see if the Impala daemon is
/// quiesced (i.e. no client requests are registered and no queries are executing on
/// the backend). If it is quiesced then it cleanly shuts down by exiting the process.
/// The statestore will detect that the process is not responding to heartbeats and
/// remove any entries.
/// 5. The shutdown deadline elapses. The Impala daemon exits regardless of whether
/// it was successfully quiesced or not.
/// If shutdown is initiated again during this process, it does not cancel the existing
/// shutdown but can decrease the deadline, e.g. if an administrator starts shutdown
/// with a deadline of 1 hour, but then wants to shut down the cluster sooner, they can
/// run the shutdown function again to set a shorter deadline. The deadline can't be
/// increased after shutdown is started.
/// Secrets
/// -------
/// The HS2 protocol has a concept of a 'secret' associated with each session and
/// client request that is returned to the client, then must be passed back along with
/// each RPC that interacts with the session or operation. RPCs with non-matching secrets
/// should fail. Beeswax does not have this concept - rather sessions are bound to a
/// connection, and that session and associated requests should (generally) only be
/// accessed from the session's connection.
/// All RPC handler functions are responsible for validating secrets when looking up
/// client sessions or requests, with some exceptions:
/// * Beewax sessions are tied to a connection, so it is always valid to access the
/// session from the original connection.
/// * Cancel() and close() Beeswax operations can cancel any query if the ID is known.
/// Existing tools (impala-shell and administrative tools) depend on this behaviour.
/// HS2 RPC handlers should pass the client-provided secret to WithSession() before
/// taking any action on behalf of the client. Beeswax RPC handlers should call
/// CheckClientRequestSession() to ensure that the client request is accessible from
/// the current session. All other functions assume that the validation was already done.
/// HTTP handlers do not have access to client secrets, so do not validate them.
/// Locking
/// -------
/// This class is partially thread-safe. To ensure freedom from deadlock, if multiple
/// locks are acquired, lower-numbered locks must be acquired before higher-numbered
/// locks:
/// 1. session_state_map_lock_
/// 2. SessionState::lock
/// 3. query_expiration_lock_
/// 4. ClientRequestState::fetch_rows_lock
/// 5. ClientRequestState::lock
/// 6. ClientRequestState::expiration_data_lock_
/// 7. Coordinator::exec_summary_lock
/// The following locks are not held in conjunction with other locks:
/// * query_log_lock_
/// * session_timeout_lock_
/// * query_locations_lock_
/// * uuid_lock_
/// * catalog_version_lock_
/// * connection_to_sessions_map_lock_
/// TODO: The same doesn't apply to the execution state of an individual plan
/// fragment: the originating coordinator might die, but we can get notified of
/// that via the statestore. This still needs to be implemented.
class ImpalaServer : public ImpalaServiceIf,
public ImpalaHiveServer2ServiceIf,
public ThriftServer::ConnectionHandlerIf,
public boost::enable_shared_from_this<ImpalaServer>,
public CacheLineAligned {
ImpalaServer(ExecEnv* exec_env);
/// Initializes and starts RPC services and other subsystems (like audit logging).
/// Returns an error if starting any services failed.
/// Different port values have special behaviour. A port > 0 explicitly specifies
/// the port the server run on. A port value of 0 means to choose an arbitrary
/// ephemeral port in tests and to not start the service in a daemon. A port < 0
/// always means to not start the service. The port values can be obtained after
/// Start() by calling GetThriftBackendPort(), GetBeeswaxPort() or GetHS2Port().
Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
int32_t hs2_http_port);
/// Blocks until the server shuts down.
void Join();
/// ImpalaService rpcs: Beeswax API (implemented in
virtual void query(beeswax::QueryHandle& query_handle, const beeswax::Query& query);
virtual void executeAndWait(beeswax::QueryHandle& query_handle,
const beeswax::Query& query, const beeswax::LogContextId& client_ctx);
virtual void explain(beeswax::QueryExplanation& query_explanation,
const beeswax::Query& query);
virtual void fetch(beeswax::Results& query_results,
const beeswax::QueryHandle& query_handle, const bool start_over,
const int32_t fetch_size);
virtual void get_results_metadata(beeswax::ResultsMetadata& results_metadata,
const beeswax::QueryHandle& handle);
virtual void close(const beeswax::QueryHandle& handle);
virtual beeswax::QueryState::type get_state(const beeswax::QueryHandle& handle);
virtual void echo(std::string& echo_string, const std::string& input_string);
virtual void clean(const beeswax::LogContextId& log_context);
virtual void get_log(std::string& log, const beeswax::LogContextId& context);
/// Return ImpalaQueryOptions default values and "support_start_over/false" to indicate
/// that Impala does not support start over in the fetch call. Hue relies on this not to
/// issue a "start_over" fetch call.
/// "include_hadoop" is not applicable.
virtual void get_default_configuration(
std::vector<beeswax::ConfigVariable>& configurations, const bool include_hadoop);
/// ImpalaService rpcs: unimplemented parts of Beeswax API.
/// These APIs will not be implemented because ODBC driver does not use them.
virtual void dump_config(std::string& config);
/// ImpalaService rpcs: extensions over Beeswax (implemented in
virtual void Cancel(impala::TStatus& status, const beeswax::QueryHandle& query_id);
virtual void CloseInsert(impala::TDmlResult& dml_result,
const beeswax::QueryHandle& query_handle);
/// Pings the Impala service and gets the server version string.
virtual void PingImpalaService(TPingImpalaServiceResp& return_val);
virtual void GetRuntimeProfile(std::string& profile_output,
const beeswax::QueryHandle& query_id);
virtual void GetExecSummary(impala::TExecSummary& result,
const beeswax::QueryHandle& query_id);
/// Performs a full catalog metadata reset, invalidating all table and database
/// metadata.
virtual void ResetCatalog(impala::TStatus& status);
/// Resets the specified table's catalog metadata, forcing a reload on the next access.
/// Returns an error if the table or database was not found in the catalog.
virtual void ResetTable(impala::TStatus& status, const TResetTableReq& request);
/// ImpalaHiveServer2Service rpcs: HiveServer2 API (implemented in
/// TODO: Migrate existing extra ImpalaServer RPCs to ImpalaHiveServer2Service.
virtual void OpenSession(
apache::hive::service::cli::thrift::TOpenSessionResp& return_val,
const apache::hive::service::cli::thrift::TOpenSessionReq& request);
virtual void CloseSession(
apache::hive::service::cli::thrift::TCloseSessionResp& return_val,
const apache::hive::service::cli::thrift::TCloseSessionReq& request);
virtual void GetInfo(
apache::hive::service::cli::thrift::TGetInfoResp& return_val,
const apache::hive::service::cli::thrift::TGetInfoReq& request);
virtual void ExecuteStatement(
apache::hive::service::cli::thrift::TExecuteStatementResp& return_val,
const apache::hive::service::cli::thrift::TExecuteStatementReq& request);
virtual void GetTypeInfo(
apache::hive::service::cli::thrift::TGetTypeInfoResp& return_val,
const apache::hive::service::cli::thrift::TGetTypeInfoReq& request);
virtual void GetCatalogs(
apache::hive::service::cli::thrift::TGetCatalogsResp& return_val,
const apache::hive::service::cli::thrift::TGetCatalogsReq& request);
virtual void GetSchemas(
apache::hive::service::cli::thrift::TGetSchemasResp& return_val,
const apache::hive::service::cli::thrift::TGetSchemasReq& request);
virtual void GetTables(
apache::hive::service::cli::thrift::TGetTablesResp& return_val,
const apache::hive::service::cli::thrift::TGetTablesReq& request);
virtual void GetTableTypes(
apache::hive::service::cli::thrift::TGetTableTypesResp& return_val,
const apache::hive::service::cli::thrift::TGetTableTypesReq& request);
virtual void GetColumns(
apache::hive::service::cli::thrift::TGetColumnsResp& return_val,
const apache::hive::service::cli::thrift::TGetColumnsReq& request);
virtual void GetFunctions(
apache::hive::service::cli::thrift::TGetFunctionsResp& return_val,
const apache::hive::service::cli::thrift::TGetFunctionsReq& request);
virtual void GetPrimaryKeys(
apache::hive::service::cli::thrift::TGetPrimaryKeysResp& return_val,
const apache::hive::service::cli::thrift::TGetPrimaryKeysReq& request);
virtual void GetCrossReference(
apache::hive::service::cli::thrift::TGetCrossReferenceResp& return_val,
const apache::hive::service::cli::thrift::TGetCrossReferenceReq& request);
virtual void GetOperationStatus(
apache::hive::service::cli::thrift::TGetOperationStatusResp& return_val,
const apache::hive::service::cli::thrift::TGetOperationStatusReq& request);
virtual void CancelOperation(
apache::hive::service::cli::thrift::TCancelOperationResp& return_val,
const apache::hive::service::cli::thrift::TCancelOperationReq& request);
virtual void CloseOperation(
apache::hive::service::cli::thrift::TCloseOperationResp& return_val,
const apache::hive::service::cli::thrift::TCloseOperationReq& request);
virtual void GetResultSetMetadata(
apache::hive::service::cli::thrift::TGetResultSetMetadataResp& return_val,
const apache::hive::service::cli::thrift::TGetResultSetMetadataReq& request);
virtual void FetchResults(
apache::hive::service::cli::thrift::TFetchResultsResp& return_val,
const apache::hive::service::cli::thrift::TFetchResultsReq& request);
virtual void GetLog(apache::hive::service::cli::thrift::TGetLogResp& return_val,
const apache::hive::service::cli::thrift::TGetLogReq& request);
virtual void GetExecSummary(TGetExecSummaryResp& return_val,
const TGetExecSummaryReq& request);
virtual void GetRuntimeProfile(TGetRuntimeProfileResp& return_val,
const TGetRuntimeProfileReq& request);
virtual void GetDelegationToken(
apache::hive::service::cli::thrift::TGetDelegationTokenResp& return_val,
const apache::hive::service::cli::thrift::TGetDelegationTokenReq& req);
virtual void CancelDelegationToken(
apache::hive::service::cli::thrift::TCancelDelegationTokenResp& return_val,
const apache::hive::service::cli::thrift::TCancelDelegationTokenReq& req);
virtual void RenewDelegationToken(
apache::hive::service::cli::thrift::TRenewDelegationTokenResp& return_val,
const apache::hive::service::cli::thrift::TRenewDelegationTokenReq& req);
// Extensions to HS2 implemented by ImpalaHiveServer2Service.
/// Pings the Impala service and gets the server version string.
virtual void PingImpalaHS2Service(TPingImpalaHS2ServiceResp& return_val,
const TPingImpalaHS2ServiceReq& req);
/// Closes an Impala operation and returns additional information about the closed
/// operation.
virtual void CloseImpalaOperation(
TCloseImpalaOperationResp& return_val, const TCloseImpalaOperationReq& request);
/// ImpalaInternalService rpcs
void UpdateFilter(TUpdateFilterResult& return_val, const TUpdateFilterParams& params);
/// Generates a unique id for this query and sets it in the given query context.
/// Prepares the given query context by populating fields required for evaluating
/// certain expressions, such as now(), pid(), etc. Should be called before handing
/// the query context to the frontend for query compilation.
void PrepareQueryContext(TQueryCtx* query_ctx);
/// Static helper for PrepareQueryContext() that is used from expr-benchmark.
static void PrepareQueryContext(const TNetworkAddress& backend_addr,
const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx);
/// SessionHandlerIf methods
/// Called when a Beeswax or HS2 connection starts. For Beeswax, registers a new
/// SessionState associated with the new connection. For HS2, this is a no-op (HS2
/// has an explicit CreateSession RPC).
virtual void ConnectionStart(const ThriftServer::ConnectionContext& session_context);
/// Called when a Beeswax or HS2 connection terminates. Unregisters all sessions
/// associated with the closed connection.
virtual void ConnectionEnd(const ThriftServer::ConnectionContext& session_context);
/// Returns true if the connection is considered idle. A connection is considered
/// idle if all the sessions associated with it have expired due to idle timeout.
/// Called when a client has been inactive for --idle_client_poll_period_s seconds.
virtual bool IsIdleConnection(const ThriftServer::ConnectionContext& session_context);
void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
std::vector<TTopicDelta>* topic_updates);
/// Processes a TCatalogUpdateResult returned from the CatalogServer and ensures
/// the update has been applied to the local impalad's catalog cache. Called from
/// ClientRequestState after executing any statement that modifies the catalog.
/// If TCatalogUpdateResult contains TCatalogObject(s) to add and/or remove, this
/// function will update the local cache by directly calling UpdateCatalog() with the
/// TCatalogObject results.
/// If TCatalogUpdateResult does not contain any TCatalogObjects and this is
/// the result of an INVALIDATE METADATA operation, it waits until the minimum
/// catalog version in the local cache is greater than or equal to the catalog
/// version specified in TCatalogUpdateResult. If it is not an INVALIDATE
/// METADATA operation, it waits until the local impalad's catalog cache has
/// been updated from a statestore heartbeat that includes this catalog
/// update's version.
/// If wait_for_all_subscribers is true, this function also
/// waits for all other catalog topic subscribers to process this update by checking the
/// current min_subscriber_topic_version included in each state store heartbeat.
Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result,
bool wait_for_all_subscribers) WARN_UNUSED_RESULT;
/// Wait until the catalog update with version 'catalog_update_version' is
/// received and applied in the local catalog cache or until the catalog
/// service id has changed.
void WaitForCatalogUpdate(const int64_t catalog_update_version,
const TUniqueId& catalog_service_id);
/// Wait until the minimum catalog object version in the local cache is
/// greater than 'min_catalog_update_version' or until the catalog
/// service id has changed.
void WaitForMinCatalogUpdate(const int64_t min_catalog_update_version,
const TUniqueId& catalog_service_id);
/// Wait until the last applied catalog update has been broadcast to
/// all coordinators or until the catalog service id has changed.
void WaitForCatalogUpdateTopicPropagation(const TUniqueId& catalog_service_id);
/// Returns true if lineage logging is enabled, false otherwise.
/// DEPRECATED: lineage file logging has been deprecated in favor of
/// query execution hooks (FE)
bool IsLineageLoggingEnabled();
/// Returns true if query execution (FE) hooks are enabled, false otherwise.
bool AreQueryHooksEnabled();
/// Returns true if audit event logging is enabled, false otherwise.
bool IsAuditEventLoggingEnabled();
/// Retuns true if this is a coordinator, false otherwise.
bool IsCoordinator();
/// Returns true if this is an executor, false otherwise.
bool IsExecutor();
/// Returns whether this backend is healthy, i.e. able to accept queries.
bool IsHealthy();
/// Returns the port that the thrift backend server is listening on. Valid to call after
/// the server has started successfully.
int GetThriftBackendPort();
/// Returns the network address that the thrift backend server is listening on. Valid
/// to call after the server has started successfully.
TNetworkAddress GetThriftBackendAddress();
/// Returns the port that the Beeswax server is listening on. Valid to call after
/// the server has started successfully.
int GetBeeswaxPort();
/// Returns the port that the Beeswax server is listening on. Valid to call after
/// the server has started successfully.
int GetHS2Port();
/// Returns a current snapshot of the local backend descriptor.
std::shared_ptr<const TBackendDescriptor> GetLocalBackendDescriptor();
/// Adds the query_id to the map from backend to the list of queries running or expected
/// to run there (query_locations_). After calling this function, the server will cancel
/// a query with an error if one of its backends fail.
void RegisterQueryLocations(
const PerBackendExecParams& per_backend_params, const TUniqueId& query_id);
/// Takes a set of network addresses of active backends and cancels all the queries
/// running on failed ones (that is, addresses not in the active set).
void CancelQueriesOnFailedBackends(
const std::unordered_set<TNetworkAddress>& current_membership);
/// Start the shutdown process. Return an error if it could not be started. Otherwise,
/// if it was successfully started by this or a previous call, return OK along with
/// information about the pending shutdown in 'shutdown_status'. 'relative_deadline_s'
/// is the deadline value in seconds to use, or -1 if we should use the default
/// deadline. See Shutdown class comment for explanation of the shutdown sequence.
Status StartShutdown(int64_t relative_deadline_s, ShutdownStatusPB* shutdown_status);
/// Returns true if a shut down is in progress.
bool IsShuttingDown() const { return shutting_down_.Load() != 0; }
/// Returns an informational error about why a new operation could not be started
/// if the server is shutting down. Must be called before starting execution of a
/// new operation (e.g. a query).
Status CheckNotShuttingDown() const;
/// Return information about the status of a shutdown. Only valid to call if a shutdown
/// is in progress (i.e. IsShuttingDown() is true).
ShutdownStatusPB GetShutdownStatus() const;
/// Convert the shutdown status to a human-readable string.
static std::string ShutdownStatusToString(const ShutdownStatusPB& shutdown_status);
/// Appends the audit_entry to audit_event_logger_.
Status AppendAuditEntry(const std::string& audit_entry);
/// Appends the lineage_entry to lineage_logger_.
Status AppendLineageEntry(const std::string& lineage_entry);
// Mapping between query option names and levels
QueryOptionLevels query_option_levels_;
/// The prefix of audit event log filename.
static const string AUDIT_EVENT_LOG_FILE_PREFIX;
/// The default executor group name for executors that do not explicitly belong to a
/// specific executor group.
static const string DEFAULT_EXECUTOR_GROUP_NAME;
/// Per-session state. This object is reference counted using shared_ptrs. There
/// is one ref count in the SessionStateMap for as long as the session is active.
/// All queries running from this session also have a reference.
struct SessionState {
/// The default hs2_version must be V1 so that child queries (which use HS2, but may
/// run as children of Beeswax sessions) get results back in the expected format -
/// child queries inherit the HS2 version from their parents, and a Beeswax session
/// will never update the HS2 version from the default.
SessionState(ImpalaServer* impala_server, TUniqueId session_id, TUniqueId secret)
: impala_server(impala_server), session_id(std::move(session_id)),
closed(false), expired(false), hs2_version(apache::hive::service::cli::thrift::
TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1), total_queries(0), ref_count(0) {
DCHECK(this->impala_server != nullptr);
/// Pointer to the Impala server of this session
ImpalaServer* impala_server;
/// The unique session ID. This is also the key for session_state_map_, but
/// we redundantly store it here for convenience.
const TUniqueId session_id;
/// The session secret that client RPCs must pass back in to access the session.
/// This must not be printed to logs or exposed in any other way.
const TUniqueId secret;
TSessionType::type session_type;
/// Time the session was created, in ms since epoch (UTC).
int64_t start_time_ms;
/// Connected user for this session, i.e. the user which originated this session.
std::string connected_user;
/// The user to delegate to. Empty for no delegation.
std::string do_as_user;
/// Client network address.
TNetworkAddress network_address;
/// Protects all fields below. See "Locking" in the class comment for lock
/// acquisition order.
boost::mutex lock;
/// If true, the session has been closed.
bool closed;
/// If true, the session was idle for too long and has been expired. Only set when
/// ref_count == 0, after which point ref_count should never become non-zero (since
/// clients will use ScopedSessionState to access the session, which will prevent use
/// after expiration).
bool expired;
/// The default database (changed as a result of 'use' query execution).
std::string database;
/// Reference to the ImpalaServer's query options
TQueryOptions* server_default_query_options;
/// Query options that have been explicitly set in this session.
TQueryOptions set_query_options;
/// BitSet indicating which query options in set_query_options have been
/// explicitly set in the session. Updated when a query option is specified using a
/// SET command: the bit corresponding to the TImpalaQueryOptions enum is set.
/// If the option is subsequently reset via a SET with an empty value, the bit
/// is cleared.
QueryOptionsMask set_query_options_mask;
/// For HS2 only, the protocol version this session is expecting.
apache::hive::service::cli::thrift::TProtocolVersion::type hs2_version;
/// Inflight queries belonging to this session. It represents the set of queries that
/// are either queued or have started executing. Used primarily to identify queries
/// that need to be closed if the session closes or expires.
boost::unordered_set<TUniqueId> inflight_queries;
/// Total number of queries run as part of this session.
int64_t total_queries;
/// Time the session was last accessed, in ms since epoch (UTC).
int64_t last_accessed_ms;
/// If this session has no open connections, this is the time in UTC when the last
/// connection was closed.
int64_t disconnected_ms;
/// The latest Kudu timestamp observed after DML operations executed within this
/// session.
uint64_t kudu_latest_observed_ts;
/// Number of RPCs concurrently accessing this session state. Used to detect when a
/// session may be correctly expired after a timeout (when ref_count == 0). Typically
/// at most one RPC will be issued against a session at a time, but clients may do
/// something unexpected and, for example, poll in one thread and fetch in another.
uint32_t ref_count;
/// Per-session idle timeout in seconds. Default value is FLAGS_idle_session_timeout.
/// It can be overridden with the query option "idle_session_timeout" when opening a
/// HS2 session, or using the SET command.
int32_t session_timeout = 0;
/// The connection ids of any connections that this session has been used over.
std::set<TUniqueId> connections;
/// Updates the session timeout based on the query option idle_session_timeout.
/// It registers/unregisters the session timeout to the Impala server.
/// The lock must be owned by the caller of this function.
void UpdateTimeout();
/// Builds a Thrift representation of this SessionState for serialisation to
/// the frontend.
void ToThrift(const TUniqueId& session_id, TSessionState* session_state);
/// Builds the overlay of the default server query options and the options
/// explicitly set in this session.
TQueryOptions QueryOptions();
struct ExpirationEvent;
class SecretArg;
friend class ChildQuery;
friend class ControlService;
friend class ImpalaHttpHandler;
friend struct SessionState;
friend class ImpalaServerTest;
boost::scoped_ptr<ImpalaHttpHandler> http_handler_;
/// Relevant ODBC SQL State code; for more info,
/// goto
static const char* SQLSTATE_GENERAL_ERROR;
/// Return exec state for given query_id, or NULL if not found.
std::shared_ptr<ClientRequestState> GetClientRequestState(
const TUniqueId& query_id);
/// Used in situations where the client provides a session ID and a query ID and the
/// caller needs to validate that the query can be accessed from the session. The two
/// arguments are the session obtained by looking up the session ID provided by the
/// RPC client and the effective user from the query. If the username doesn't match,
/// return an error indicating that the query was not found (since we prefer not to
/// leak information about the existence of queries that the user doesn't have
/// access to).
Status CheckClientRequestSession(SessionState* session,
const std::string& client_request_effective_user, const TUniqueId& query_id);
/// Updates a set of Impalad catalog metrics including number tables/databases and
/// some cache metrics applicable to local catalog mode (if configured). Called at
/// the end of statestore update callback (to account for metadata object changes)
/// and at the end of query planning to update local catalog cache access metrics.
Status UpdateCatalogMetrics() WARN_UNUSED_RESULT;
/// Depending on the query type, this either submits the query to the admission
/// controller for performing async admission control or starts asynchronous execution
/// of query. Creates ClientRequestState (returned in exec_state), registers it and
/// calls ClientRequestState::Execute(). If it returns with an error status, exec_state
/// will be NULL and nothing will have been registered in client_request_state_map_.
/// session_state is a ptr to the session running this query and must have been checked
/// out.
Status Execute(TQueryCtx* query_ctx, std::shared_ptr<SessionState> session_state,
std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;
/// Implements Execute() logic, but doesn't unregister query on error.
Status ExecuteInternal(const TQueryCtx& query_ctx,
std::shared_ptr<SessionState> session_state, bool* registered_exec_state,
std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;
/// Registers the query exec state with client_request_state_map_ using the
/// globally unique query_id.
/// The caller must have checked out the session state.
Status RegisterQuery(std::shared_ptr<SessionState> session_state,
const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;
/// Adds the query to the set of in-flight queries for the session. The query remains
/// in-flight until the query is unregistered. Until a query is in-flight, an attempt
/// to cancel or close the query by the user will return an error status. If the
/// session is closed before a query is in-flight, then the query cancellation is
/// deferred until after the issuing path has completed initializing the query. Once
/// a query is in-flight, it can be cancelled/closed asynchronously by the user
/// (e.g. via an RPC) and the session close path can close (cancel and unregister) it.
/// The query must have already been registered using RegisterQuery(). The caller
/// must have checked out the session state.
Status SetQueryInflight(std::shared_ptr<SessionState> session_state,
const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;
/// Unregister the query by cancelling it, removing exec_state from
/// client_request_state_map_, and removing the query id from session state's
/// in-flight query list. If check_inflight is true, then return an error if the query
/// is not yet in-flight. Otherwise, proceed even if the query isn't yet in-flight (for
/// cleaning up after an error on the query issuing path).
Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
const Status* cause = NULL) WARN_UNUSED_RESULT;
/// Initiates query cancellation reporting the given cause as the query status.
/// Assumes deliberate cancellation by the user if the cause is NULL. Returns an
/// error if query_id is not found. If check_inflight is true, then return an error
/// if the query is not yet in-flight. Otherwise, returns OK. Queries still need to
/// be unregistered, after cancellation. Caller should not hold any locks when
/// calling this function.
Status CancelInternal(const TUniqueId& query_id, bool check_inflight,
const Status* cause = NULL) WARN_UNUSED_RESULT;
/// Close the session and release all resource used by this session.
/// Caller should not hold any locks when calling this function.
/// If ignore_if_absent is true, returns OK even if a session with the supplied ID does
/// not exist.
Status CloseSessionInternal(const TUniqueId& session_id, const SecretArg& secret,
bool ignore_if_absent) WARN_UNUSED_RESULT;
/// Gets the runtime profile string for a given query_id and writes it to the output
/// stream. First searches for the query id in the map of in-flight queries. If no
/// match is found there, the query log is searched. Returns OK if the profile was
/// found, otherwise a Status object with an error message will be returned. The
/// output stream will not be modified on error.
/// On success, if 'format' is BASE64 or STRING then 'output' will be set, or if
/// 'format' is THRIFT then 'thrift_output' will be set. If 'format' is JSON
/// then 'json_output' will be set.
/// If the user asking for this profile is the same user that runs the query
/// and that user has access to the runtime profile, the profile is written to
/// the output. Otherwise, nothing is written to output and an error code is
/// returned to indicate an authorization error.
Status GetRuntimeProfileOutput(const TUniqueId& query_id, const std::string& user,
TRuntimeProfileFormat::type format, std::stringstream* output,
TRuntimeProfileTree* thrift_output,
rapidjson::Document* json_output) WARN_UNUSED_RESULT;
/// Returns the exec summary for this query if the user asking for the exec
/// summary is the same user that run the query and that user has access to the full
/// query profile. Otherwise, an error status is returned to indicate an
/// authorization error.
Status GetExecSummary(const TUniqueId& query_id, const std::string& user,
TExecSummary* result) WARN_UNUSED_RESULT;
/// Collect ExecSummary and update it to the profile in request_state
void UpdateExecSummary(std::shared_ptr<ClientRequestState> request_state) const;
/// Initialize "default_configs_" to show the default values for ImpalaQueryOptions and
/// "support_start_over/false" to indicate that Impala does not support start over
/// in the fetch call.
void InitializeConfigVariables();
/// Sets the option level for parameter 'option' based on the mapping stored in
/// 'query_option_levels_'. The option level is used by the Impala shell when it
/// displays the options. 'option_key' is the key for the 'query_option_levels_'
/// to get the level of the query option.
void AddOptionLevelToConfig(beeswax::ConfigVariable* option,
const string& option_key) const;
/// Checks settings for profile logging, including whether the output
/// directory exists and is writeable, and initialises the first log file.
/// Returns OK unless there is some problem preventing profile log files
/// from being written. If an error is returned, the constructor will disable
/// profile logging.
Status InitProfileLogging() WARN_UNUSED_RESULT;
/// Checks settings for audit event logging, including whether the output
/// directory exists and is writeable, and initialises the first log file.
/// Returns OK unless there is some problem preventing audit event log files
/// from being written. If an error is returned, impalad startup will be aborted.
Status InitAuditEventLogging() WARN_UNUSED_RESULT;
/// Checks settings for lineage logging, including whether the output
/// directory exists and is writeable, and initialises the first log file.
/// Returns OK unless there is some problem preventing lineage log files
/// from being written. If an error is returned, impalad startup will be aborted.
Status InitLineageLogging() WARN_UNUSED_RESULT;
/// Initializes a logging directory, creating the directory if it does not already
/// exist. If there is any error creating the directory an error will be returned.
static Status InitLoggingDir(const std::string& log_dir) WARN_UNUSED_RESULT;
/// Runs once every 5s to flush the profile log file to disk.
[[noreturn]] void LogFileFlushThread();
/// Runs once every 5s to flush the audit log file to disk.
[[noreturn]] void AuditEventLoggerFlushThread();
/// Runs once every 5s to flush the lineage log file to disk.
[[noreturn]] void LineageLoggerFlushThread();
/// Copies a query's state into the query log. Called immediately prior to a
/// ClientRequestState's deletion. Also writes the query profile to the profile log
/// on disk.
void ArchiveQuery(const ClientRequestState& query);
/// Checks whether the given user is allowed to delegate as the specified do_as_user.
/// Returns OK if the authorization suceeds, otherwise returns an status with details
/// on why the failure occurred.
Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user)
/// Initializes the backend descriptor in 'be_desc' with the local backend information.
void BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_desc);
/// Snapshot of a query's state, archived in the query log.
struct QueryStateRecord {
/// Pretty-printed runtime profile. TODO: Copy actual profile object
std::string profile_str;
/// Base64 encoded runtime profile
std::string encoded_profile_str;
/// JSON based runtime profile
std::string json_profile_str;
/// Query id
TUniqueId id;
/// Queries are run and authorized on behalf of the effective_user.
/// If there is no delegated user, this will be the connected user. Otherwise, it
/// will be set to the delegated user.
std::string effective_user;
/// If true, effective_user has access to the runtime profile and execution
/// summary.
bool user_has_profile_access;
/// default db for this query
std::string default_db;
/// SQL statement text
std::string stmt;
/// Text representation of plan
std::string plan;
/// DDL, DML etc.
TStmtType::type stmt_type;
/// True if the query required a coordinator fragment
bool has_coord;
/// The number of fragments that have completed
int64_t num_complete_fragments;
/// The total number of fragments
int64_t total_fragments;
/// The number of rows fetched by the client
int64_t num_rows_fetched;
/// The state of the query as of this snapshot
beeswax::QueryState::type query_state;
/// Start and end time of the query, in Unix microseconds.
/// A query whose end_time_us is 0 indicates that it is an in-flight query.
/// These two variables are initialized with the corresponding values from
/// ClientRequestState.
int64_t start_time_us, end_time_us;
/// Summary of execution for this query.
TExecSummary exec_summary;
Status query_status;
/// Timeline of important query events
TEventSequence event_sequence;
/// Save the query plan fragments so that the plan tree can be rendered on the debug
/// webpages.
vector<TPlanFragment> fragments;
// If true, this query has no more rows to return
bool all_rows_returned;
// The most recent time this query was actively being processed, in Unix milliseconds.
int64_t last_active_time_ms;
/// Resource pool to which the request was submitted for admission, or an empty
/// string if this request doesn't go through admission control.
std::string resource_pool;
/// Initialise from an exec_state. If copy_profile is true, print the query
/// profile to a string and copy that into this.profile (which is expensive),
/// otherwise leave this.profile empty.
/// If encoded_str is non-empty, it is the base64 encoded string for
/// exec_state->profile.
QueryStateRecord(const ClientRequestState& exec_state, bool copy_profile = false,
const std::string& encoded_str = "");
/// Default constructor used only when participating in collections
QueryStateRecord() { }
struct QueryStateRecordLessThan {
/// Comparator that sorts by start time.
bool operator() (const QueryStateRecord& lhs, const QueryStateRecord& rhs) const;
/// Beeswax private methods
/// Helper functions to translate between Beeswax and Impala structs
Status QueryToTQueryContext(const beeswax::Query& query, TQueryCtx* query_ctx)
void TUniqueIdToQueryHandle(const TUniqueId& query_id, beeswax::QueryHandle* handle);
void QueryHandleToTUniqueId(const beeswax::QueryHandle& handle, TUniqueId* query_id);
/// Helper function to raise BeeswaxException
[[noreturn]] void RaiseBeeswaxException(const std::string& msg, const char* sql_state);
/// Executes the fetch logic. Doesn't clean up the exec state if an error occurs.
Status FetchInternal(ClientRequestState* request_state, bool start_over,
int32_t fetch_size, beeswax::Results* query_results) WARN_UNUSED_RESULT;
/// Populate dml_result and clean up exec state. If the query
/// status is an error, dml_result is not populated and the status is returned.
/// 'session' is RPC client's session, used to check whether the DML can
/// be closed via that session.
Status CloseInsertInternal(SessionState* session, const TUniqueId& query_id,
TDmlResult* dml_result) WARN_UNUSED_RESULT;
/// HiveServer2 private methods (implemented in
/// Starts the synchronous execution of a HiverServer2 metadata operation.
/// If the execution succeeds, an ClientRequestState will be created and registered in
/// client_request_state_map_. Otherwise, nothing will be registered in
/// client_request_state_map_ and an error status will be returned. As part of this
/// call, the TMetadataOpRequest struct will be populated with the requesting user's
/// session state.
/// Returns a TOperationHandle and TStatus.
void ExecuteMetadataOp(
const apache::hive::service::cli::thrift::THandleIdentifier& session_handle,
TMetadataOpRequest* request,
apache::hive::service::cli::thrift::TOperationHandle* handle,
apache::hive::service::cli::thrift::TStatus* status);
/// Executes the fetch logic for HiveServer2 FetchResults and stores result size in
/// 'num_results'. If fetch_first is true, then the query's state should be reset to
/// fetch from the beginning of the result set. Doesn't clean up 'request_state' if an
/// error occurs.
Status FetchInternal(ClientRequestState* request_state, SessionState* session,
int32_t fetch_size, bool fetch_first,
apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results,
int32_t* num_results) WARN_UNUSED_RESULT;
/// Helper functions to translate between HiveServer2 and Impala structs
/// Returns !ok() if handle.guid.size() or handle.secret.size() != 16
static Status THandleIdentifierToTUniqueId(
const apache::hive::service::cli::thrift::THandleIdentifier& handle,
TUniqueId* unique_id, TUniqueId* secret) WARN_UNUSED_RESULT;
static void TUniqueIdToTHandleIdentifier(
const TUniqueId& unique_id, const TUniqueId& secret,
apache::hive::service::cli::thrift::THandleIdentifier* handle);
Status TExecuteStatementReqToTQueryContext(
const apache::hive::service::cli::thrift::TExecuteStatementReq execute_request,
TQueryCtx* query_ctx) WARN_UNUSED_RESULT;
/// Helper method to process cancellations that result from failed backends, called from
/// the cancellation thread pool. The cancellation_work contains the query id to cancel
/// and a cause listing the failed backends that led to cancellation. Calls
/// CancelInternal directly, but has a signature compatible with the thread pool.
void CancelFromThreadPool(uint32_t thread_id,
const CancellationWork& cancellation_work);
/// Helper method to add the pool name and query options to the query_ctx. Must be
/// called before ExecuteInternal() at which point the TQueryCtx is const and cannot
/// be mutated. override_options_mask indicates which query options can be overridden
/// by the pool default query options.
void AddPoolConfiguration(TQueryCtx* query_ctx,
const QueryOptionsMask& override_options_mask);
/// Register timeout value upon opening a new session. This will wake up
/// session_timeout_thread_.
void RegisterSessionTimeout(int32_t timeout);
/// Unregister timeout value.
void UnregisterSessionTimeout(int32_t timeout);
/// To be run in a thread which wakes up every second if there are registered sesions
/// timeouts. This function checks all sessions for:
/// - Last-idle times. Those that have been idle for longer than their configured
/// timeout values are 'expired': they will no longer accept queries.
/// - Disconnected times. Those that have had no active connections for longer than
/// FLAGS_disconnected_session_timeout are closed: they are removed from the session
/// state map and can no longer be accessed by clients.
/// For either case any running queries associated with those sessions are unregistered.
[[noreturn]] void SessionMaintenance();
/// Runs forever, walking queries_by_timestamp_ and expiring any queries that have been
/// idle (i.e. no client input and no time spent processing locally) for
/// FLAGS_idle_query_timeout seconds.
[[noreturn]] void ExpireQueries();
/// Periodically iterates over all queries and cancels any where a backend hasn't sent a
/// status report in greater than GetMaxReportRetryMs().
[[noreturn]] void UnresponsiveBackendThread();
/// Called from ExpireQueries() to check query resource limits for 'crs'. If the query
/// exceeded a resource limit, returns a non-OK status with information about what
/// limit was exceeded. Returns OK if the query will continue running and expiration
/// check should be rescheduled for a later time.
Status CheckResourceLimits(ClientRequestState* crs);
/// Expire 'crs' and cancel it with status 'status'.
void ExpireQuery(ClientRequestState* crs, const Status& status);
typedef boost::unordered_map<std::string, boost::unordered_set<std::string>>
/// Populates authorized proxy config into the given map.
/// For example:
/// - authorized_proxy_config: foo=abc,def;bar=ghi
/// - authorized_proxy_config_delimiter: ,
/// - authorized_proxy_map: {foo:[abc, def], bar=s[ghi]}
static Status PopulateAuthorizedProxyConfig(
const std::string& authorized_proxy_config,
const std::string& authorized_proxy_config_delimiter,
AuthorizedProxyMap* authorized_proxy_map);
/// Background thread that does the shutdown.
[[noreturn]] void ShutdownThread();
/// Random number generator for use in this class, thread safe.
static ThreadSafeRandom rng_;
/// Guards query_log_ and query_log_index_
boost::mutex query_log_lock_;
/// FIFO list of query records, which are written after the query finishes executing
typedef std::list<QueryStateRecord> QueryLog;
QueryLog query_log_;
/// Index that allows lookup via TUniqueId into the query log
typedef boost::unordered_map<TUniqueId, QueryLog::iterator> QueryLogIndex;
QueryLogIndex query_log_index_;
/// Logger for writing encoded query profiles, one per line with the following format:
/// <ms-since-epoch> <query-id> <thrift query profile URL encoded and gzipped>
boost::scoped_ptr<SimpleLogger> profile_logger_;
/// Logger for writing audit events, one per line with the format:
/// "<current timestamp>" : { JSON object }
boost::scoped_ptr<SimpleLogger> audit_event_logger_;
/// Logger for writing lineage events, one per line with the format:
/// { JSON object }
boost::scoped_ptr<SimpleLogger> lineage_logger_;
/// If profile logging is enabled, wakes once every 5s to flush query profiles to disk
std::unique_ptr<Thread> profile_log_file_flush_thread_;
/// If audit event logging is enabled, wakes once every 5s to flush audit events to disk
std::unique_ptr<Thread> audit_event_logger_flush_thread_;
/// If lineage logging is enabled, wakes once every 5s to flush lineage events to disk
std::unique_ptr<Thread> lineage_logger_flush_thread_;
/// global, per-server state
ExecEnv* exec_env_; // not owned
/// Thread pool to process cancellation requests that come from failed Impala demons to
/// avoid blocking the statestore callback.
boost::scoped_ptr<ThreadPool<CancellationWork>> cancellation_thread_pool_;
/// Thread that runs SessionMaintenance. It will wake up periodically to check for
/// sessions which are idle for more their timeout values.
std::unique_ptr<Thread> session_maintenance_thread_;
/// Contains all the non-zero idle or disconnected session timeout values.
std::multiset<int32_t> session_timeout_set_;
/// The lock for protecting the session_timeout_set_.
boost::mutex session_timeout_lock_;
/// session_timeout_thread_ relies on the following conditional variable to wake up
/// when there are sessions that have a timeout.
ConditionVariable session_timeout_cv_;
/// Thread that runs UnresponsiveBackendThread().
std::unique_ptr<Thread> unresponsive_backend_thread_;
/// maps from query id to exec state; ClientRequestState is owned by us and referenced
/// as a shared_ptr to allow asynchronous deletion
typedef class ShardedQueryMap<std::shared_ptr<ClientRequestState>>
ClientRequestStateMap client_request_state_map_;
/// Default query options in the form of TQueryOptions and beeswax::ConfigVariable
TQueryOptions default_query_options_;
std::vector<beeswax::ConfigVariable> default_configs_;
// Container for a secret passed into functions for validation.
class SecretArg {
// This should only be used if the client has not provided a secret but the caller
// knows that the client in fact is allowed to access the session or operation that
// they are accessing.
static SecretArg SkipSecretCheck() {
return SecretArg(true, true, TUniqueId(), TUniqueId());
/// Pass a session secret for validation.
static SecretArg Session(const TUniqueId& secret) {
return SecretArg(false, true, secret, TUniqueId());
/// Pass a query secret for validation. The error message will include the 'query_id',
/// so that must also be passed.
static SecretArg Operation(const TUniqueId& secret, const TUniqueId& query_id) {
return SecretArg(false, false, secret, query_id);
/// Return true iff the check should be skipped or the secret matches 'other'.
/// All validation should be done via this function to avoid subtle errors.
bool Validate(const TUniqueId& other) const {
return skip_validation_ || ConstantTimeCompare(other) == 0;
bool is_session_secret() const { return is_session_secret_; }
TUniqueId query_id() const {
return query_id_;
SecretArg(bool skip_validation, bool is_session_secret, TUniqueId secret,
TUniqueId query_id)
: skip_validation_(skip_validation),
query_id_(std::move(query_id)) {}
/// A comparison function for unique IDs that executes in an amount of time unrelated
/// to the input values. Returns the number of words that differ between id1 and id2.
int ConstantTimeCompare(const TUniqueId& other) const;
/// True if the caller wants to skip validation of the session.
const bool skip_validation_;
/// True if this is a session secret, false if this is an operation secret.
const bool is_session_secret_;
/// The secret to validate.
const TUniqueId secret_;
/// The query id, only provided if this is an operation secret.
const TUniqueId query_id_;
/// Class that allows users of SessionState to mark a session as in-use, and therefore
/// immune to expiration. The marking is done in WithSession() and undone in the
/// destructor, so this class can be used to 'check-out' a session for the duration of a
/// scope.
class ScopedSessionState {
ScopedSessionState(ImpalaServer* impala) : impala_(impala) { }
/// Marks a session as in-use, and saves it so that it can be unmarked when this
/// object goes out of scope. Returns OK unless there is an error in GetSessionState.
/// 'secret' must be provided and is validated against the stored secret for the
/// session. Must only be called once per ScopedSessionState.
Status WithSession(const TUniqueId& session_id, const SecretArg& secret,
std::shared_ptr<SessionState>* session = NULL) WARN_UNUSED_RESULT {
DCHECK(session_.get() == NULL);
session_id, secret, &session_, /* mark_active= */ true));
if (session != NULL) (*session) = session_;
// We won't have a connection context in the case of ChildQuery, which calls into
// hiveserver2 functions directly without going through the Thrift stack.
if (ThriftServer::HasThreadConnectionContext()) {
// Check that the session user matches the user authenticated on the connection.
const ThriftServer::Username& connection_username =
if (!connection_username.empty()
&& session_->connected_user != connection_username) {
return Status::Expected(TErrorCode::UNAUTHORIZED_SESSION_USER,
connection_username, session_->connected_user);
// Try adding the session id to the connection's set of sessions in case this is
// the first time this session has been used on this connection.
impala_->AddSessionToConnection(session_id, session_.get());
return Status::OK();
/// Same as WithSession(), except:
/// * It should only be called from beeswax with a 'connection_id' obtained from
/// ThriftServer::GetThreadConnectionId().
/// * It does not update the session/connection mapping, as beeswax sessions can
/// only be used over a single connection.
Status WithBeeswaxSession(const TUniqueId& connection_id,
std::shared_ptr<SessionState>* session = NULL) {
DCHECK(session_.get() == NULL);
impala_->GetSessionState(connection_id, SecretArg::SkipSecretCheck(), &session_,
/* mark_active= */ true));
if (session != NULL) (*session) = session_;
return Status::OK();
/// Decrements the reference count so the session can be expired correctly.
~ScopedSessionState() {
if (session_.get() != NULL) {
/// Reference-counted pointer to the session state object.
std::shared_ptr<SessionState> session_;
/// Saved so that we can access ImpalaServer methods to get / return session state.
ImpalaServer* impala_;
/// For access to GetSessionState() / MarkSessionInactive()
friend class ScopedSessionState;
/// Protects session_state_map_. See "Locking" in the class comment for lock
/// acquisition order.
boost::mutex session_state_map_lock_;
/// A map from session identifier to a structure containing per-session information
typedef boost::unordered_map<TUniqueId, std::shared_ptr<SessionState>> SessionStateMap;
SessionStateMap session_state_map_;
/// Protects connection_to_sessions_map_. See "Locking" in the class comment for lock
/// acquisition order.
boost::mutex connection_to_sessions_map_lock_;
/// Map from a connection ID to the associated list of sessions so that all can be
/// closed when the connection ends. HS2 allows for multiplexing several sessions across
/// a single connection. If a session has already been closed (only possible via HS2) it
/// is not removed from this map to avoid the cost of looking it up.
typedef boost::unordered_map<TUniqueId, std::set<TUniqueId>> ConnectionToSessionMap;
ConnectionToSessionMap connection_to_sessions_map_;
/// Returns session state for given session_id.
/// If not found or validation of 'secret' against the stored secret in the
/// SessionState fails, session_state will be NULL and an error status will be returned.
/// If mark_active is true, also checks if the session is expired or closed and
/// increments the session's reference counter if it is still alive.
Status GetSessionState(const TUniqueId& session_id, const SecretArg& secret,
std::shared_ptr<SessionState>* session_state, bool mark_active = false)
/// Decrement the session's reference counter and mark last_accessed_ms so that state
/// expiration can proceed.
inline void MarkSessionInactive(std::shared_ptr<SessionState> session) {
boost::lock_guard<boost::mutex> l(session->lock);
DCHECK_GT(session->ref_count, 0);
session->last_accessed_ms = UnixMillis();
/// Associate the current connection context with the given session in
/// 'connection_to_sessions_map_' and 'SessionState::connections'.
void AddSessionToConnection(const TUniqueId& session_id, SessionState* session);
/// Protects query_locations_. Not held in conjunction with other locks.
boost::mutex query_locations_lock_;
/// A map from backend to the list of queries currently running or expected to run
/// there.
typedef std::unordered_map<TNetworkAddress, std::unordered_set<TUniqueId>>
QueryLocations query_locations_;
/// The local backend descriptor. Updated in GetLocalBackendDescriptor() and protected
/// by 'local_backend_descriptor_lock_';
std::shared_ptr<const TBackendDescriptor> local_backend_descriptor_;
boost::mutex local_backend_descriptor_lock_;
/// UUID generator for session IDs and secrets. Uses system random device to get
/// cryptographically secure random numbers.
boost::uuids::basic_random_generator<boost::random_device> crypto_uuid_generator_;
/// Lock to protect uuid_generator
boost::mutex uuid_lock_;
/// Lock for catalog_update_version_, min_subscriber_catalog_topic_version_,
/// and catalog_version_update_cv_
boost::mutex catalog_version_lock_;
/// Variable to signal when the catalog version has been modified
ConditionVariable catalog_version_update_cv_;
/// Contains details on the version information of a catalog update.
struct CatalogUpdateVersionInfo {
CatalogUpdateVersionInfo() :
catalog_object_version_lower_bound(0L) {
/// Update the metrics to store the current version of catalog, current topic and
/// current service id used by impalad.
void UpdateCatalogVersionMetrics();
/// The last catalog version returned from UpdateCatalog()
int64_t catalog_version;
/// The CatalogService ID that this catalog version is from.
TUniqueId catalog_service_id;
/// The statestore catalog topic version this update was received in.
int64_t catalog_topic_version;
/// Lower bound of catalog object versions after a call to UpdateCatalog()
int64_t catalog_object_version_lower_bound;
/// The version information from the last successfull call to UpdateCatalog().
CatalogUpdateVersionInfo catalog_update_info_;
/// The current minimum topic version processed across all subscribers of the catalog
/// topic. Used to determine when other nodes have successfully processed a catalog
/// update. Updated with each catalog topic heartbeat from the statestore.
int64_t min_subscriber_catalog_topic_version_;
/// Map of short usernames of authorized proxy users to the set of users they are
/// allowed to delegate to. Populated by parsing the --authorized_proxy_users_config
/// flag.
AuthorizedProxyMap authorized_proxy_user_config_;
/// Map of short usernames of authorized proxy users to the set of groups they are
/// allowed to delegate to. Populated by parsing the --authorized_proxy_groups_config
/// flag.
AuthorizedProxyMap authorized_proxy_group_config_;
/// Guards queries_by_timestamp_. See "Locking" in the class comment for lock
/// acquisition order.
boost::mutex query_expiration_lock_;
enum class ExpirationKind {
// The query is cancelled if the query has been inactive this long. The event may
// cancel the query after checking the last active time.
// A hard time limit on query execution. The query is cancelled if this event occurs
// before the query finishes.
// A hard limit on cpu and scanned bytes. The query is cancelled if this event occurs
// before the query finishes.
// Describes a query expiration event where the query identified by 'query_id' is
// checked for expiration when UnixMillis() exceeds 'deadline'.
struct ExpirationEvent {
int64_t deadline;
TUniqueId query_id;
ExpirationKind kind;
/// Comparator that breaks ties when two queries have identical expiration deadlines.
struct ExpirationEventComparator {
bool operator()(const ExpirationEvent& t1, const ExpirationEvent& t2) {
if (t1.deadline < t2.deadline) return true;
if (t2.deadline < t1.deadline) return false;
if (t1.query_id < t2.query_id) return true;
if (t2.query_id < t1.query_id) return false;
return t1.kind < t2.kind;
/// Ordered set of (expiration_time, query_id) pairs. This queue is updated either by
/// RegisterQuery(), which adds a new query to the set, or by ExpireQueries(), which
/// updates the entries as it iterates over the set. Therefore, it is not directly
/// accessed during normal query execution and the benefit of that is there is no
/// competition for locks when ExpireQueries() walks this list to find expired queries.
/// In order to make the query expiration algorithm work, the following condition always
/// holds:
/// * For any pair (t, q) in the set, t is always a lower bound on the true expiration
/// time of the query q (in q->idle_time()). Therefore we can bound the expiration error
/// by sleeping to no more than t + d for some small d (in our implementation, d is 1s).
/// The reason that the expiration time saved in each entry here may not exactly match
/// the true expiration time of a query is because we wish to avoid accessing (and
/// therefore locking) this structure on every query activity. Instead, queries maintain
/// an accurate expiration time, and this structure guarantees that we will always
/// (modulo scheduling delays out of our control) read the expiration time before it has
/// passed.
typedef std::set<ExpirationEvent, ExpirationEventComparator> ExpirationQueue;
ExpirationQueue queries_by_timestamp_;
/// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
std::unique_ptr<Thread> query_expiration_thread_;
/// True if this ImpalaServer can accept client connections and coordinate
/// queries.
bool is_coordinator_;
/// True if this ImpalaServer can execute query fragments.
bool is_executor_;
/// Containers for client and internal services. May not be set if the ports passed to
/// Init() were <= 0.
/// Note that these hold a shared pointer to 'this', and so need to be reset()
/// explicitly.
boost::scoped_ptr<ThriftServer> beeswax_server_;
boost::scoped_ptr<ThriftServer> hs2_server_;
boost::scoped_ptr<ThriftServer> hs2_http_server_;
boost::scoped_ptr<ThriftServer> thrift_be_server_;
/// Flag that records if backend and/or client services have been started. The flag is
/// set after all services required for the server have been started.
std::atomic_bool services_started_;
/// Whether the Impala server shutdown process started. If 0, shutdown was not started.
/// Otherwise, this is the MonotonicMillis() value when the shut down was started.
AtomicInt64 shutting_down_{0};
/// The MonotonicMillis() value after we should shut down regardless of registered
/// client requests and running finstances. Set before 'shutting_down_' and updated
/// atomically if a new shutdown command with a shorter deadline comes in.
AtomicInt64 shutdown_deadline_{0};