blob: 029ae61dc0927a7af73fa1fec7d5b426377d1a53 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/impala-server.h"
#include <unistd.h>
#include <algorithm>
#include <exception>
#include <sstream>
#ifdef CALLONCEHACK
#include <calloncehack.h>
#endif
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <boost/bind.hpp>
#include <boost/unordered_set.hpp>
#include <gutil/strings/split.h>
#include <gutil/strings/substitute.h>
#include <gutil/walltime.h>
#include <openssl/err.h>
#include <openssl/evp.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>
#include <sys/types.h>
#include "catalog/catalog-server.h"
#include "catalog/catalog-util.h"
#include "common/compiler-util.h"
#include "common/logging.h"
#include "common/object-pool.h"
#include "common/thread-debug-info.h"
#include "exec/external-data-source-executor.h"
#include "exprs/timezone_db.h"
#include "gen-cpp/CatalogService_constants.h"
#include "gen-cpp/admission_control_service.proxy.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/security/security_flags.h"
#include "kudu/util/random_util.h"
#include "rpc/authentication.h"
#include "rpc/rpc-mgr.h"
#include "rpc/rpc-trace.h"
#include "rpc/thrift-thread.h"
#include "rpc/thrift-util.h"
#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
#include "runtime/lib-cache.h"
#include "runtime/query-driver.h"
#include "runtime/tmp-file-mgr.h"
#include "runtime/io/disk-io-mgr.h"
#include "scheduling/admission-control-service.h"
#include "scheduling/admission-controller.h"
#include "service/cancellation-work.h"
#include "service/client-request-state.h"
#include "service/frontend.h"
#include "service/impala-http-handler.h"
#include "service/query-state-record.h"
#include "util/auth-util.h"
#include "util/coding-util.h"
#include "util/common-metrics.h"
#include "util/debug-util.h"
#include "util/error-util.h"
#include "util/histogram-metric.h"
#include "util/impalad-metrics.h"
#include "util/jwt-util.h"
#include "util/metrics.h"
#include "util/network-util.h"
#include "util/openssl-util.h"
#include "util/pretty-printer.h"
#include "util/redactor.h"
#include "util/runtime-profile-counters.h"
#include "util/runtime-profile.h"
#include "util/simple-logger.h"
#include "util/string-parser.h"
#include "util/summary-util.h"
#include "util/test-info.h"
#include "util/time.h"
#include "util/uid-util.h"
#include "gen-cpp/Types_types.h"
#include "gen-cpp/ImpalaService.h"
#include "gen-cpp/ImpalaService_types.h"
#include "gen-cpp/Frontend_types.h"
#include "common/names.h"
using boost::adopt_lock_t;
using boost::algorithm::is_any_of;
using boost::algorithm::istarts_with;
using boost::algorithm::join;
using boost::algorithm::replace_all_copy;
using boost::algorithm::split;
using boost::algorithm::token_compress_on;
using boost::get_system_time;
using boost::system_time;
using boost::uuids::random_generator;
using boost::uuids::uuid;
using google::protobuf::RepeatedPtrField;
using kudu::GetRandomSeed32;
using kudu::rpc::RpcContext;
using kudu::security::SecurityDefaults;
using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace beeswax;
using namespace boost::posix_time;
using namespace rapidjson;
using namespace strings;
DECLARE_string(nn);
DECLARE_int32(nn_port);
DECLARE_string(authorized_proxy_user_config);
DECLARE_string(authorized_proxy_user_config_delimiter);
DECLARE_string(authorized_proxy_group_config);
DECLARE_string(authorized_proxy_group_config_delimiter);
DECLARE_string(debug_actions);
DECLARE_bool(abort_on_config_error);
DECLARE_bool(disk_spill_encryption);
DECLARE_bool(enable_ldap_auth);
DECLARE_bool(gen_experimental_profile);
DECLARE_bool(use_local_catalog);
DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
"If 0 or less, the Beeswax server is not started. This interface is deprecated and "
"will be removed in a future version.");
DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are served."
"If 0 or less, the HiveServer2 server is not started.");
DEFINE_int32(hs2_http_port, 28000, "port on which HiveServer2 HTTP(s) client "
"requests are served. If 0 or less, the HiveServer2 http server is not started.");
DEFINE_int32(external_fe_port, 0, "port on which External Frontend requests are served. "
"If 0 or less, the External Frontend server is not started. Careful consideration "
"must be taken when enabling due to the fact that this port is currently always "
"unauthenticated.");
DEFINE_bool(enable_external_fe_http, false,
"if true enables http transport for external_fe_port otherwise binary transport is "
"used");
DEFINE_int32(fe_service_threads, 64,
"number of threads available to serve client requests");
DEFINE_string(default_query_options, "", "key=value pair of default query options for"
" impalad, separated by ','");
DEFINE_int32(query_log_size, 200,
"Number of queries to retain in the query log. If -1, "
"the query log has unbounded size. Used in combination with query_log_size_in_bytes, "
"whichever is less.");
DEFINE_int64(query_log_size_in_bytes, 2L * 1024 * 1024 * 1024,
"Total maximum bytes of queries to retain in the query log. If -1, "
"the query log has unbounded size. Used in combination with query_log_size, "
"whichever is less");
DEFINE_int32(query_stmt_size, 250, "length of the statements in the query log. If <=0, "
"the full statement is displayed in the query log without trimming.");
DEFINE_bool(log_query_to_file, true, "if true, logs completed query profiles to file.");
DEFINE_int64(max_result_cache_size, 100000L, "Maximum number of query results a client "
"may request to be cached on a per-query basis to support restarting fetches. This "
"option guards against unreasonably large result caches requested by clients. "
"Requests exceeding this maximum will be rejected.");
DEFINE_int32(max_audit_event_log_file_size, 5000, "The maximum size (in queries) of the "
"audit event log file before a new one is created (if event logging is enabled)");
DEFINE_string(audit_event_log_dir, "", "The directory in which audit event log files are "
"written. Setting this flag will enable audit event logging.");
DEFINE_bool(abort_on_failed_audit_event, true, "Shutdown Impala if there is a problem "
"recording an audit event.");
DEFINE_int32(max_audit_event_log_files, 0, "Maximum number of audit event log files "
"to retain. The most recent audit event log files are retained. If set to 0, "
"all audit event log files are retained.");
DEFINE_int32(max_lineage_log_file_size, 5000, "The maximum size (in queries) of "
"the lineage event log file before a new one is created (if lineage logging is "
"enabled)");
DEFINE_string(lineage_event_log_dir, "", "The directory in which lineage event log "
"files are written. Setting this flag with enable lineage logging.");
DEFINE_bool(abort_on_failed_lineage_event, true, "Shutdown Impala if there is a problem "
"recording a lineage record.");
DEFINE_string(profile_log_dir, "", "The directory in which profile log files are"
" written. If blank, defaults to <log_file_dir>/profiles");
DEFINE_int32(max_profile_log_file_size, 5000, "The maximum size (in queries) of the "
"profile log file before a new one is created");
DEFINE_int32(max_profile_log_files, 10, "Maximum number of profile log files to "
"retain. The most recent log files are retained. If set to 0, all log files "
"are retained.");
DEFINE_int32(cancellation_thread_pool_size, 5,
"(Advanced) Size of the thread-pool processing cancellations due to node failure");
DEFINE_int32(unregistration_thread_pool_size, 4,
"(Advanced) Size of the thread-pool for unregistering queries, including "
"finalizing runtime profiles");
// Limit the number of queries that can be queued for unregistration to avoid holding
// too many queries in memory unnecessary. The default is set fairly low so that if
// queries are finishing faster than they can be unregistered, there will be backpressure
// on query execution before too much memory fills up with queries pending unregistration.
DEFINE_int32(unregistration_thread_pool_queue_depth, 16,
"(Advanced) Max number of queries that can be queued for unregistration.");
DEFINE_string(ssl_server_certificate, "", "The full path to the SSL certificate file used"
" to authenticate Impala to clients. If set, both Beeswax and HiveServer2 ports will "
"only accept SSL connections");
DEFINE_string(ssl_private_key, "", "The full path to the private key used as a "
"counterpart to the public key contained in --ssl_server_certificate. If "
"--ssl_server_certificate is set, this option must be set as well.");
DEFINE_string(ssl_client_ca_certificate, "", "(Advanced) The full path to a certificate "
"used by Thrift clients to check the validity of a server certificate. May either be "
"a certificate for a third-party Certificate Authority, or a copy of the certificate "
"the client expects to receive from the server.");
DEFINE_string(ssl_private_key_password_cmd, "", "A Unix command whose output returns the "
"password used to decrypt the certificate private key file specified in "
"--ssl_private_key. If the .PEM key file is not password-protected, this command "
"will not be invoked. The output of the command will be truncated to 1024 bytes, and "
"then all trailing whitespace will be trimmed before it is used to decrypt the "
"private key");
// This defaults ssl_cipher_list to the same set of ciphers used by Kudu,
// which is based on Mozilla's intermediate compatibility recommendations
// from https://wiki.mozilla.org/Security/Server_Side_TLS
DEFINE_string(ssl_cipher_list, SecurityDefaults::kDefaultTlsCiphers,
"The cipher suite preferences to use for TLS-secured "
"Thrift RPC connections. Uses the OpenSSL cipher preference list format. See man (1) "
"ciphers for more information. If empty, the default cipher list for your platform "
"is used");
DEFINE_string(tls_ciphersuites,
kudu::security::SecurityDefaults::kDefaultTlsCipherSuites,
"The TLSv1.3 cipher suites to use for TLS-secured Thrift RPC and KRPC connections. "
"TLSv1.3 uses a new way to specify ciper suites that is independent of the older "
"TLSv1.2 and below cipher lists. See 'man (1) ciphers' for more information. "
"This flag is only effective if Impala is built with OpenSSL v1.1.1 or newer.");
const string SSL_MIN_VERSION_HELP = "The minimum SSL/TLS version that Thrift "
"services should use for both client and server connections. Supported versions are "
"TLSv1.0, TLSv1.1 and TLSv1.2 (as long as the system OpenSSL library supports them)";
DEFINE_string(ssl_minimum_version, "tlsv1.2", SSL_MIN_VERSION_HELP.c_str());
DEFINE_int32(idle_session_timeout, 0, "The time, in seconds, that a session may be idle"
" for before it is closed (and all running queries cancelled) by Impala. If 0, idle"
" sessions are never expired. It can be overridden by the query option"
" 'idle_session_timeout' for specific sessions");
DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be idle for"
" (i.e. no processing work is done and no updates are received from the client) "
"before it is cancelled. If 0, idle queries are never expired. The query option "
"QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
" the maximum allowable timeout.");
DEFINE_int32(disconnected_session_timeout, 15 * 60, "The time, in seconds, that a "
"hiveserver2 session will be maintained after the last connection that it has been "
"used over is disconnected.");
DEFINE_int32(max_hs2_sessions_per_user, -1, "The maximum allowed number of HiveServer2 "
"sessions that can be opened by any single connected user on a coordinator. "
"If set to -1 or 0 then this check is not performed. If set to a positive value "
"then the per-user session count is viewable in the webui under /sessions.");
DEFINE_int32(idle_client_poll_period_s, 30, "The poll period, in seconds, after "
"no activity from an Impala client which an Impala service thread (beeswax and HS2) "
"wakes up to check if the connection should be closed. If --idle_session_timeout is "
"also set, a client connection will be closed if all the sessions associated with it "
"have become idle. Set this to 0 to disable the polling behavior and clients' "
"connection will remain opened until they are explicitly closed.");
DEFINE_int32(status_report_interval_ms, 5000, "(Advanced) Interval between profile "
"reports in milliseconds. If set to <= 0, periodic reporting is disabled and only "
"the final report is sent.");
DEFINE_int32(status_report_max_retry_s, 600, "(Advanced) Max amount of time in seconds "
"for a backend to attempt to send a status report before cancelling. This must be > "
"--status_report_interval_ms. Effective only if --status_report_interval_ms > 0.");
DEFINE_int32(status_report_cancellation_padding, 20, "(Advanced) The coordinator will "
"wait --status_report_max_retry_s * (1 + --status_report_cancellation_padding / 100) "
"without receiving a status report before deciding that a backend is unresponsive "
"and the query should be cancelled. This must be > 0.");
DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and coordinate "
"queries from clients. If false, it will refuse client connections.");
DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
"fragments.");
DEFINE_string(executor_groups, "",
"List of executor groups, separated by comma. Each executor group specification can "
"optionally contain a minimum size, separated by a ':', e.g. --executor_groups "
"default-pool-1:3. Default minimum size is 1. Only when the cluster membership "
"contains at least that number of executors for the group will it be considered "
"healthy for admission. Currently only a single group may be specified.");
DEFINE_int32(num_expected_executors, 20,
"The number of executors that are expected to "
"be available for the execution of a single query. This value is used during "
"planning if no executors have started yet. Once a healthy executor group has "
"started, its size is used instead. NOTE: This flag is overridden by "
"'expected_executor_group_sets' which is a more expressive way of specifying "
"multiple executor group sets");
DEFINE_string(expected_executor_group_sets, "",
"Only used by the coordinator. List of expected executor group sets, separated by "
"comma in the following format: <executor_group_name_prefix>:<expected_group_size> . "
"For eg. “prefix1:10”, this set will include executor groups named like "
"prefix1-group1, prefix1-group2, etc. The expected group size (number of executors "
"in each group) is used during planning when no healthy executor group is available. "
"If this flag is used then any executor groups that do not map to the specified group"
" sets will never be used to schedule queries.");
// TODO: can we automatically choose a startup grace period based on the max admission
// control queue timeout + some margin for error?
DEFINE_int64(shutdown_grace_period_s, 120, "Shutdown startup grace period in seconds. "
"When the shutdown process is started for this daemon, it will wait for at least the "
"startup grace period before shutting down. This gives time for updated cluster "
"membership information to propagate to all coordinators and for fragment instances "
"that were scheduled based on old cluster membership to start executing (and "
"therefore be reflected in the metrics used to detect quiescence).");
DEFINE_int64(shutdown_deadline_s, 60 * 60, "Default time limit in seconds for the shut "
"down process. If this duration elapses after the shut down process is started, "
"the daemon shuts down regardless of any running queries.");
#ifndef NDEBUG
DEFINE_int64(stress_metadata_loading_pause_injection_ms, 0, "Simulates metadata loading"
"for a given query by injecting a sleep equivalent to this configuration in "
"milliseconds. Only used for testing.");
#endif
DEFINE_int64(accepted_client_cnxn_timeout, 300000,
"(Advanced) The amount of time in milliseconds an accepted connection will wait in "
"the post-accept, pre-setup connection queue before it is timed out and the "
"connection request is rejected. A value of 0 means there is no timeout.");
DEFINE_string(query_event_hook_classes, "", "Comma-separated list of java QueryEventHook "
"implementation classes to load and register at Impala startup. Class names should "
"be fully-qualified and on the classpath. Whitespace acceptable around delimiters.");
DEFINE_int32(query_event_hook_nthreads, 1, "Number of threads to use for "
"QueryEventHook execution. If this number is >1 then hooks will execute "
"concurrently.");
DECLARE_bool(compact_catalog_topic);
DEFINE_bool(use_local_tz_for_unix_timestamp_conversions, false,
"When true, TIMESTAMPs are interpreted in the local time zone when converting to "
"and from Unix times. When false, TIMESTAMPs are interpreted in the UTC time zone. "
"Set to true for Hive compatibility. "
"Can be overriden with the query option with the same name.");
// Provide a workaround for IMPALA-1658.
DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
"When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
"be converted from UTC to local time. Writes are unaffected. "
"Can be overriden with the query option with the same name.");
DEFINE_int32(admission_heartbeat_frequency_ms, 1000,
"(Advanced) The time in milliseconds to wait between sending heartbeats to the "
"admission service, if enabled. Heartbeats are used to ensure resources are properly "
"accounted for even if rpcs to the admission service occasionally fail.");
DEFINE_bool(auto_check_compaction, false,
"When true, compaction checking will be conducted for each query in local catalog "
"mode. Note that this checking introduces additional overhead because Impala makes "
"additional RPCs to hive metastore for each table in a query during the query "
"compilation.");
DEFINE_int32(wait_for_new_catalog_service_id_timeout_sec, 5 * 60,
"During DDL/DML queries, if there is a mismatch between the catalog service ID that"
"the coordinator knows of and the one in the RPC response from the catalogd, the "
"coordinator waits for a statestore update with a new catalog service ID in order to "
"catch up with the one in the RPC response. However, in rare cases the service ID "
"the coordinator knows of is the more recent one, in which case it could wait "
"infinitely - to avoid this, this flag can be set to a positive value (in seconds) "
"to limit the waiting time. Negative values and zero have no effect. See also "
"'--wait_for_new_catalog_service_id_max_iterations,'.");
DEFINE_int32(wait_for_new_catalog_service_id_max_iterations, 10,
"This flag is used in the same situation as described at the "
"'--wait_for_new_catalog_service_id_timeout_sec' flag. Instead of limiting the "
"waiting time, the effect of this flag is that the coordinator gives up waiting "
"after receiving the set number of valid catalog updates that do not change the "
"catalog service ID. Negative values and zero have no effect. If both this flag and "
"'--wait_for_new_catalog_service_id_timeout_sec' are set, the coordinator stops "
"waiting when the stop condition of either of them is met. Note that it is possible "
"that the coordinator does not receive any catalog update from the statestore and in "
"this case it will wait indefinitely if "
"'--wait_for_new_catalog_service_id_timeout_sec' is not set.");
// Flags for JWT token based authentication.
DECLARE_bool(jwt_token_auth);
DECLARE_bool(jwt_validate_signature);
DECLARE_string(jwks_file_path);
DECLARE_string(jwks_url);
DECLARE_bool(jwks_verify_server_certificate);
DECLARE_string(jwks_ca_certificate);
namespace {
using namespace impala;
void SetExecutorGroups(const string& flag, BackendDescriptorPB* be_desc) {
vector<StringPiece> groups;
groups = Split(flag, ",", SkipEmpty());
if (groups.empty()) groups.push_back(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
DCHECK_EQ(1, groups.size());
// Name and optional minimum group size are separated by ':'.
for (const StringPiece& group : groups) {
int colon_idx = group.find_first_of(':');
ExecutorGroupDescPB* group_desc = be_desc->add_executor_groups();
group_desc->set_name(group.substr(0, colon_idx).as_string());
if (colon_idx != StringPiece::npos) {
StringParser::ParseResult result;
group_desc->set_min_size(StringParser::StringToInt<int64_t>(
group.data() + colon_idx + 1, group.length() - colon_idx - 1, &result));
if (result != StringParser::PARSE_SUCCESS) {
LOG(FATAL) << "Failed to parse minimum executor group size from group: "
<< group.ToString();
}
} else {
group_desc->set_min_size(1);
}
}
}
} // end anonymous namespace
namespace impala {
// Prefix of profile, event and lineage log filenames. The version number is
// internal, and does not correspond to an Impala release - it should
// be changed only when the file format changes.
//
// In the 1.0 version of the profile log, the timestamp at the beginning of each entry
// was relative to the local time zone. In log version 1.1, this was changed to be
// relative to UTC. The same time zone change was made for the audit log, but the
// version was kept at 1.0 because there is no known consumer of the timestamp.
const string PROFILE_LOG_FILE_PREFIX = "impala_profile_log_1.1-";
const string AUDIT_EVENT_LOG_FILE_PREFIX = "impala_audit_event_log_1.0-";
const string LINEAGE_LOG_FILE_PREFIX = "impala_lineage_log_1.0-";
const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536;
const string ImpalaServer::BEESWAX_SERVER_NAME = "beeswax-frontend";
const string ImpalaServer::HS2_SERVER_NAME = "hiveserver2-frontend";
const string ImpalaServer::HS2_HTTP_SERVER_NAME = "hiveserver2-http-frontend";
const string ImpalaServer::INTERNAL_SERVER_NAME = "internal-server";
const string ImpalaServer::EXTERNAL_FRONTEND_SERVER_NAME = "external-frontend";
const string ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME = "default";
const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
const char* ImpalaServer::GET_LOG_QUERY_RETRY_INFO_FORMAT =
"Original query failed:\n$0\nQuery has been retried using query id: $1\n";
// Interval between checks for query expiration.
const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
// Template to return error messages for client requests that could not be found, belonged
// to the wrong session, or had a mismatched secret. We need to use this particular string
// in some places because the shell has a regex for it.
// TODO: Make consistent "Invalid or unknown query handle: $0" template used elsewhere.
// TODO: this should be turned into a proper error code and used throughout ImpalaServer.
static const char* LEGACY_INVALID_QUERY_HANDLE_TEMPLATE = "Query id $0 not found.";
ThreadSafeRandom ImpalaServer::rng_(GetRandomSeed32());
ImpalaServer::ImpalaServer(ExecEnv* exec_env)
: exec_env_(exec_env),
services_started_(false) {
// Initialize default config
InitializeConfigVariables();
Status status = exec_env_->frontend()->ValidateSettings();
if (!status.ok()) {
LOG(ERROR) << status.GetDetail();
if (FLAGS_abort_on_config_error) {
CLEAN_EXIT_WITH_ERROR(
"Aborting Impala Server startup due to improper configuration");
}
}
status = exec_env_->tmp_file_mgr()->Init(exec_env_->metrics());
if (!status.ok()) {
LOG(ERROR) << status.GetDetail();
if (FLAGS_abort_on_config_error) {
CLEAN_EXIT_WITH_ERROR("Aborting Impala Server startup due to improperly "
"configured scratch directories.");
}
}
if (!InitProfileLogging().ok()) {
LOG(ERROR) << "Query profile archival is disabled";
FLAGS_log_query_to_file = false;
}
if (!InitAuditEventLogging().ok()) {
CLEAN_EXIT_WITH_ERROR("Aborting Impala Server startup due to failure initializing "
"audit event logging");
}
if (!InitLineageLogging().ok()) {
CLEAN_EXIT_WITH_ERROR("Aborting Impala Server startup due to failure initializing "
"lineage logging");
}
if (!FLAGS_authorized_proxy_user_config.empty()) {
Status status = PopulateAuthorizedProxyConfig(FLAGS_authorized_proxy_user_config,
FLAGS_authorized_proxy_user_config_delimiter, &authorized_proxy_user_config_);
if (!status.ok()) {
CLEAN_EXIT_WITH_ERROR(Substitute("Invalid proxy user configuration."
"No mapping value specified for the proxy user. For more information review "
"usage of the --authorized_proxy_user_config flag: $0", status.GetDetail()));
}
}
if (!FLAGS_authorized_proxy_group_config.empty()) {
Status status = PopulateAuthorizedProxyConfig(FLAGS_authorized_proxy_group_config,
FLAGS_authorized_proxy_group_config_delimiter, &authorized_proxy_group_config_);
if (!status.ok()) {
CLEAN_EXIT_WITH_ERROR(Substitute("Invalid proxy group configuration. "
"No mapping value specified for the proxy group. For more information review "
"usage of the --authorized_proxy_group_config flag: $0", status.GetDetail()));
}
}
if (FLAGS_disk_spill_encryption) {
// Initialize OpenSSL for spilling encryption. This is not thread-safe so we
// initialize it once on startup.
// TODO: Set OpenSSL callbacks to provide locking to make the library thread-safe.
OpenSSL_add_all_algorithms();
ERR_load_crypto_strings();
}
ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env_->metrics()));
// Register the catalog update callback if running in a real cluster as a coordinator.
if ((!TestInfo::is_test() || TestInfo::is_be_cluster_test()) && FLAGS_is_coordinator) {
auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
vector<TTopicDelta>* topic_updates) {
CatalogUpdateCallback(state, topic_updates);
};
// The 'local-catalog' implementation only needs minimal metadata to
// trigger cache invalidations.
// The legacy implementation needs full metadata objects.
string filter_prefix = FLAGS_use_local_catalog ?
g_CatalogService_constants.CATALOG_TOPIC_V2_PREFIX :
g_CatalogService_constants.CATALOG_TOPIC_V1_PREFIX;
ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
/* populate_min_subscriber_topic_version=*/ true,
filter_prefix, catalog_cb));
}
// Initialise the cancellation thread pool with 5 (by default) threads. The max queue
// size is deliberately set so high that it should never fill; if it does the
// cancellations will get ignored and retried on the next statestore heartbeat.
cancellation_thread_pool_.reset(new ThreadPool<CancellationWork>(
"impala-server", "cancellation-worker",
FLAGS_cancellation_thread_pool_size, MAX_CANCELLATION_QUEUE_SIZE,
bind<void>(&ImpalaServer::CancelFromThreadPool, this, _2)));
ABORT_IF_ERROR(cancellation_thread_pool_->Init());
unreg_thread_pool_.reset(new ThreadPool<QueryHandle>("impala-server",
"unregistration-worker", FLAGS_unregistration_thread_pool_size,
FLAGS_unregistration_thread_pool_queue_depth,
bind<void>(&ImpalaServer::FinishUnregisterQuery, this, _2)));
ABORT_IF_ERROR(unreg_thread_pool_->Init());
// Initialize a session expiry thread which blocks indefinitely until the first session
// with non-zero timeout value is opened. Note that a session which doesn't specify any
// idle session timeout value will use the default value FLAGS_idle_session_timeout.
ABORT_IF_ERROR(Thread::Create("impala-server", "session-maintenance",
bind<void>(&ImpalaServer::SessionMaintenance, this), &session_maintenance_thread_));
ABORT_IF_ERROR(Thread::Create("impala-server", "query-expirer",
bind<void>(&ImpalaServer::ExpireQueries, this), &query_expiration_thread_));
// Only enable the unresponsive backend thread if periodic status reporting is enabled.
if (FLAGS_status_report_interval_ms > 0) {
if (FLAGS_status_report_max_retry_s * 1000 <= FLAGS_status_report_interval_ms) {
const string& err = "Since --status_report_max_retry_s <= "
"--status_report_interval_ms, most queries will likely be cancelled.";
LOG(ERROR) << err;
if (FLAGS_abort_on_config_error) {
CLEAN_EXIT_WITH_ERROR(Substitute("Aborting Impala Server startup: $0", err));
}
}
if (FLAGS_status_report_cancellation_padding <= 0) {
const string& err = "--status_report_cancellation_padding should be > 0.";
LOG(ERROR) << err;
if (FLAGS_abort_on_config_error) {
CLEAN_EXIT_WITH_ERROR(Substitute("Aborting Impala Server startup: $0", err));
}
}
ABORT_IF_ERROR(Thread::Create("impala-server", "unresponsive-backend-thread",
bind<void>(&ImpalaServer::UnresponsiveBackendThread, this),
&unresponsive_backend_thread_));
}
if (exec_env_->AdmissionServiceEnabled()) {
ABORT_IF_ERROR(Thread::Create("impala-server", "admission-heartbeat-thread",
bind<void>(&ImpalaServer::AdmissionHeartbeatThread, this),
&admission_heartbeat_thread_));
}
is_coordinator_ = FLAGS_is_coordinator;
is_executor_ = FLAGS_is_executor;
}
Status ImpalaServer::PopulateAuthorizedProxyConfig(
const string& authorized_proxy_config,
const string& authorized_proxy_config_delimiter,
AuthorizedProxyMap* authorized_proxy_config_map) {
// Parse the proxy user configuration using the format:
// <proxy user>=<comma separated list of users/groups they are allowed to delegate>
// See FLAGS_authorized_proxy_user_config or FLAGS_authorized_proxy_group_config
// for more details.
vector<string> proxy_config;
split(proxy_config, authorized_proxy_config, is_any_of(";"),
token_compress_on);
if (proxy_config.size() > 0) {
for (const string& config: proxy_config) {
size_t pos = config.find('=');
if (pos == string::npos) {
return Status(config);
}
string proxy_user = config.substr(0, pos);
boost::trim(proxy_user);
string config_str = config.substr(pos + 1);
boost::trim(config_str);
vector<string> parsed_allowed_users_or_groups;
split(parsed_allowed_users_or_groups, config_str,
is_any_of(authorized_proxy_config_delimiter), token_compress_on);
unordered_set<string> allowed_users_or_groups(
parsed_allowed_users_or_groups.begin(), parsed_allowed_users_or_groups.end());
authorized_proxy_config_map->insert({proxy_user, allowed_users_or_groups});
}
}
return Status::OK();
}
bool ImpalaServer::IsCoordinator() { return is_coordinator_; }
bool ImpalaServer::IsExecutor() { return is_executor_; }
bool ImpalaServer::IsHealthy() { return AreServicesReady(); }
int ImpalaServer::GetBeeswaxPort() {
DCHECK(beeswax_server_ != nullptr);
return beeswax_server_->port();
}
int ImpalaServer::GetHS2Port() {
DCHECK(hs2_server_ != nullptr);
return hs2_server_->port();
}
bool ImpalaServer::IsLineageLoggingEnabled() {
return !FLAGS_lineage_event_log_dir.empty();
}
bool ImpalaServer::AreQueryHooksEnabled() {
return !FLAGS_query_event_hook_classes.empty();
}
Status ImpalaServer::InitLineageLogging() {
if (!IsLineageLoggingEnabled()) {
LOG(INFO) << "Lineage logging is disabled";
return Status::OK();
}
lineage_logger_.reset(new SimpleLogger(FLAGS_lineage_event_log_dir,
LINEAGE_LOG_FILE_PREFIX, FLAGS_max_lineage_log_file_size));
RETURN_IF_ERROR(lineage_logger_->Init());
RETURN_IF_ERROR(Thread::Create("impala-server", "lineage-log-flush",
&ImpalaServer::LineageLoggerFlushThread, this, &lineage_logger_flush_thread_));
return Status::OK();
}
bool ImpalaServer::IsAuditEventLoggingEnabled() {
return !FLAGS_audit_event_log_dir.empty();
}
Status ImpalaServer::InitAuditEventLogging() {
if (!IsAuditEventLoggingEnabled()) {
LOG(INFO) << "Event logging is disabled";
return Status::OK();
}
audit_event_logger_.reset(new SimpleLogger(FLAGS_audit_event_log_dir,
AUDIT_EVENT_LOG_FILE_PREFIX, FLAGS_max_audit_event_log_file_size,
FLAGS_max_audit_event_log_files));
RETURN_IF_ERROR(audit_event_logger_->Init());
RETURN_IF_ERROR(Thread::Create("impala-server", "audit-event-log-flush",
&ImpalaServer::AuditEventLoggerFlushThread, this,
&audit_event_logger_flush_thread_));
return Status::OK();
}
Status ImpalaServer::InitProfileLogging() {
if (!FLAGS_log_query_to_file) return Status::OK();
if (FLAGS_profile_log_dir.empty()) {
stringstream ss;
ss << FLAGS_log_dir << "/profiles/";
FLAGS_profile_log_dir = ss.str();
}
profile_logger_.reset(new SimpleLogger(FLAGS_profile_log_dir,
PROFILE_LOG_FILE_PREFIX, FLAGS_max_profile_log_file_size,
FLAGS_max_profile_log_files));
RETURN_IF_ERROR(profile_logger_->Init());
RETURN_IF_ERROR(Thread::Create("impala-server", "log-flush-thread",
&ImpalaServer::LogFileFlushThread, this, &profile_log_file_flush_thread_));
return Status::OK();
}
Status ImpalaServer::AppendAuditEntry(const string& entry ) {
DCHECK(IsAuditEventLoggingEnabled());
return audit_event_logger_->AppendEntry(entry);
}
Status ImpalaServer::AppendLineageEntry(const string& entry ) {
DCHECK(IsLineageLoggingEnabled());
return lineage_logger_->AppendEntry(entry);
}
Status ImpalaServer::GetRuntimeProfileOutput(const string& user,
const QueryHandle& query_handle, TRuntimeProfileFormat::type format,
RuntimeProfileOutput* profile) {
// For queries in INITIALIZED state, the profile information isn't populated yet.
if (query_handle->exec_state() == ClientRequestState::ExecState::INITIALIZED) {
return Status::Expected("Query plan is not ready.");
}
lock_guard<mutex> l(*query_handle->lock());
RETURN_IF_ERROR(CheckProfileAccess(
user, query_handle->effective_user(), query_handle->user_has_profile_access()));
if (query_handle->GetCoordinator() != nullptr) {
UpdateExecSummary(query_handle);
}
if (format == TRuntimeProfileFormat::BASE64) {
RETURN_IF_ERROR(
query_handle->profile()->SerializeToArchiveString(profile->string_output));
} else if (format == TRuntimeProfileFormat::THRIFT) {
query_handle->profile()->ToThrift(profile->thrift_output);
} else if (format == TRuntimeProfileFormat::JSON) {
query_handle->profile()->ToJson(profile->json_output);
} else {
DCHECK_EQ(format, TRuntimeProfileFormat::STRING);
query_handle->profile()->PrettyPrint(profile->string_output);
}
return Status::OK();
}
Status ImpalaServer::GetQueryRecord(const TUniqueId& query_id,
shared_ptr<QueryStateRecord>* query_record,
shared_ptr<QueryStateRecord>* retried_query_record) {
lock_guard<mutex> l(query_log_lock_);
auto iterator = query_log_index_.find(query_id);
if (iterator == query_log_index_.end()) {
// Common error, so logging explicitly and eliding Status's stack trace.
string err =
strings::Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(query_id));
VLOG(1) << err;
return Status::Expected(err);
}
*query_record = *(iterator->second);
if (retried_query_record != nullptr && (*query_record)->was_retried) {
DCHECK((*query_record)->retried_query_id != nullptr);
iterator = query_log_index_.find(*(*query_record)->retried_query_id);
// The record of the retried query should always be later in the query log compared
// to the original query. Since the query log is a FIFO queue, this means that if the
// original query is in the log, then the retried query must be in the log as well.
DCHECK(iterator != query_log_index_.end());
*retried_query_record = *(iterator->second);
}
return Status::OK();
}
Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id,
const string& user, TRuntimeProfileFormat::type format,
RuntimeProfileOutput* original_profile, RuntimeProfileOutput* retried_profile,
bool* was_retried) {
DCHECK(original_profile != nullptr);
DCHECK(original_profile->string_output != nullptr);
DCHECK(retried_profile != nullptr);
DCHECK(retried_profile->string_output != nullptr);
// Search for the query id in the active query map
{
// QueryHandle of the active query and original query. If the query was retried the
// active handle points to the most recent query attempt and the original handle
// points to the original query attempt (the one that failed). If the query was not
// retried the active handle == the original handle.
QueryHandle active_query_handle;
QueryHandle original_query_handle;
Status status =
GetAllQueryHandles(query_id, &active_query_handle, &original_query_handle,
/*return_unregistered=*/ true);
if (status.ok()) {
// If the query was retried, then set the retried profile using the active query
// handle. The active query handle corresponds to the most recent query attempt,
// so it should be used to set the retried profile.
if (original_query_handle->WasRetried()) {
*was_retried = true;
RETURN_IF_ERROR(
GetRuntimeProfileOutput(user, active_query_handle, format, retried_profile));
}
// Set the profile for the original query.
RETURN_IF_ERROR(
GetRuntimeProfileOutput(user, original_query_handle, format, original_profile));
return Status::OK();
}
}
// The query was not found in the active query map, search the query log.
{
// Set the profile for the original query.
shared_ptr<QueryStateRecord> query_record;
shared_ptr<QueryStateRecord> retried_query_record;
RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record, &retried_query_record));
RETURN_IF_ERROR(CheckProfileAccess(user, query_record->effective_user,
query_record->user_has_profile_access));
RETURN_IF_ERROR(DecompressToProfile(format, query_record, original_profile));
// Set the profile for the retried query.
if (query_record->was_retried) {
*was_retried = true;
DCHECK(retried_query_record != nullptr);
// If the original profile was accessible by the user, then the retried profile
// must be accessible by the user as well.
Status status = CheckProfileAccess(user, retried_query_record->effective_user,
retried_query_record->user_has_profile_access);
DCHECK(status.ok());
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(DecompressToProfile(format, retried_query_record, retried_profile));
}
}
return Status::OK();
}
Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id,
const string& user, TRuntimeProfileFormat::type format,
RuntimeProfileOutput* profile) {
DCHECK(profile != nullptr);
DCHECK(format == TRuntimeProfileFormat::JSON || profile->string_output != nullptr);
// Search for the query id in the active query map
{
QueryHandle query_handle;
Status status = GetQueryHandle(query_id, &query_handle,
/*return_unregistered=*/ true);
if (status.ok()) {
RETURN_IF_ERROR(GetRuntimeProfileOutput(user, query_handle, format, profile));
return Status::OK();
}
}
// The query was not found the active query map, search the query log.
{
shared_ptr<QueryStateRecord> query_record;
RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record));
RETURN_IF_ERROR(CheckProfileAccess(user, query_record->effective_user,
query_record->user_has_profile_access));
RETURN_IF_ERROR(DecompressToProfile(format, query_record, profile));
}
return Status::OK();
}
Status ImpalaServer::DecompressToProfile(TRuntimeProfileFormat::type format,
shared_ptr<QueryStateRecord> query_record, RuntimeProfileOutput* profile) {
if (format == TRuntimeProfileFormat::BASE64) {
Base64Encode(query_record->compressed_profile, profile->string_output);
} else if (format == TRuntimeProfileFormat::THRIFT) {
RETURN_IF_ERROR(RuntimeProfile::DecompressToThrift(
query_record->compressed_profile, profile->thrift_output));
} else if (format == TRuntimeProfileFormat::JSON) {
ObjectPool tmp_pool;
RuntimeProfile* tmp_profile;
RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
query_record->compressed_profile, &tmp_pool, &tmp_profile));
tmp_profile->ToJson(profile->json_output);
} else {
DCHECK_EQ(format, TRuntimeProfileFormat::STRING);
ObjectPool tmp_pool;
RuntimeProfile* tmp_profile;
RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
query_record->compressed_profile, &tmp_pool, &tmp_profile));
tmp_profile->PrettyPrint(profile->string_output);
}
return Status::OK();
}
void ImpalaServer::WaitForNewCatalogServiceId(TUniqueId cur_service_id,
unique_lock<mutex>* ver_lock) {
DCHECK(ver_lock != nullptr);
// The catalog service ID of 'catalog_update_result' does not match the current catalog
// service ID. It is possible that catalogd has been restarted and
// 'catalog_update_result' contains the new service ID but we haven't received the
// statestore update about the new catalogd yet. We'll wait until we receive an update
// with a new catalog service ID or we give up (if
// --wait_for_new_catalog_service_id_timeout_sec is set and we time out OR if
// --wait_for_new_catalog_service_id_max_iterations is set and we reach the max number
// of updates without a new service ID). The timeout is useful in case the service ID of
// 'catalog_update_result' is actually older than the current catalog service ID. This
// is possible if the RPC response came from the old catalogd and we have already
// received the statestore update about the new one (see IMPALA-12267).
const bool timeout_set = FLAGS_wait_for_new_catalog_service_id_timeout_sec > 0;
const int64_t timeout_ms =
FLAGS_wait_for_new_catalog_service_id_timeout_sec * MILLIS_PER_SEC;
timespec wait_end_time;
if (timeout_set) TimeFromNowMillis(timeout_ms, &wait_end_time);
const bool max_statestore_updates_set =
FLAGS_wait_for_new_catalog_service_id_max_iterations > 0;
bool timed_out = false;
int num_statestore_updates = 0;
int64_t old_catalog_version = catalog_update_info_.catalog_version;
while (catalog_update_info_.catalog_service_id == cur_service_id) {
if (max_statestore_updates_set
&& catalog_update_info_.catalog_version != old_catalog_version) {
old_catalog_version = catalog_update_info_.catalog_version;
++num_statestore_updates;
if (num_statestore_updates < FLAGS_wait_for_new_catalog_service_id_max_iterations) {
LOG(INFO) << "Received " << num_statestore_updates << " non-empty catalog "
<< "updates from the statestore while waiting for an update with a new "
<< "catalog service ID but the catalog service ID has not changed. Going to "
<< "give up waiting after "
<< FLAGS_wait_for_new_catalog_service_id_max_iterations
<< " such updates in total.";
} else {
LOG(WARNING) << "Received " << num_statestore_updates << " non-empty catalog "
<< "updates from the statestore while waiting for an update with a new "
<< "catalog service ID but the catalog service ID has not changed. "
<< "Giving up waiting.";
break;
}
}
if (timeout_set) {
timed_out = !catalog_version_update_cv_.WaitUntil(*ver_lock, wait_end_time);
if (timed_out) {
LOG(WARNING) << "Waiting for catalog update with a new "
<< "catalog service ID timed out.";
break;
}
} else {
catalog_version_update_cv_.Wait(*ver_lock);
}
}
}
Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& user,
TExecSummary* result, TExecSummary* original_result, bool* was_retried) {
if (was_retried != nullptr) *was_retried = false;
// Search for the query id in the active query map.
{
// QueryHandle of the current query.
QueryHandle query_handle;
// QueryHandle or the original query if the query is retried.
QueryHandle original_query_handle;
Status status = GetAllQueryHandles(query_id, &query_handle, &original_query_handle,
/*return_unregistered=*/ true);
if (status.ok()) {
lock_guard<mutex> l(*query_handle->lock());
RETURN_IF_ERROR(CheckProfileAccess(user, query_handle->effective_user(),
query_handle->user_has_profile_access()));
if (query_handle->exec_state() == ClientRequestState::ExecState::PENDING) {
const string* admission_result = query_handle->summary_profile()->GetInfoString(
AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT);
if (admission_result != nullptr) {
if (*admission_result == AdmissionController::PROFILE_INFO_VAL_QUEUED) {
result->__set_is_queued(true);
const string* queued_reason = query_handle->summary_profile()->GetInfoString(
AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON);
if (queued_reason != nullptr) {
result->__set_queued_reason(*queued_reason);
}
}
}
} else if (query_handle->GetCoordinator() != nullptr) {
query_handle->GetCoordinator()->GetTExecSummary(result);
TExecProgress progress;
progress.__set_num_completed_scan_ranges(
query_handle->GetCoordinator()->scan_progress().num_complete());
progress.__set_total_scan_ranges(
query_handle->GetCoordinator()->scan_progress().total());
progress.__set_num_completed_fragment_instances(
query_handle->GetCoordinator()->query_progress().num_complete());
progress.__set_total_fragment_instances(
query_handle->GetCoordinator()->query_progress().total());
// TODO: does this not need to be synchronized?
result->__set_progress(progress);
} else {
*result = TExecSummary();
}
if (query_handle->IsRetriedQuery()) {
// Don't need to acquire lock on original_query_handle since the query is
// finished. There are no concurrent updates on its status.
result->error_logs.push_back(original_query_handle->query_status().GetDetail());
result->error_logs.push_back(Substitute("Retrying query using query id: $0",
PrintId(query_handle->query_id())));
result->__isset.error_logs = true;
if (was_retried != nullptr) {
*was_retried = true;
DCHECK(original_result != nullptr);
// The original query could not in PENDING state because it already fails.
// Handle the other two cases as above.
if (original_query_handle->GetCoordinator() != nullptr) {
original_query_handle->GetCoordinator()->GetTExecSummary(original_result);
} else {
*original_result = TExecSummary();
}
}
}
return Status::OK();
}
}
// Look for the query in completed query log.
{
string effective_user;
bool user_has_profile_access = false;
bool is_query_missing = false;
TExecSummary exec_summary;
TExecSummary retried_exec_summary;
{
shared_ptr<QueryStateRecord> query_record;
shared_ptr<QueryStateRecord> retried_query_record;
is_query_missing =
!GetQueryRecord(query_id, &query_record, &retried_query_record).ok();
if (!is_query_missing) {
effective_user = query_record->effective_user;
user_has_profile_access = query_record->user_has_profile_access;
exec_summary = query_record->exec_summary;
if (query_record->was_retried) {
if (was_retried != nullptr) *was_retried = true;
DCHECK(retried_query_record != nullptr);
retried_exec_summary = retried_query_record->exec_summary;
}
}
}
if (is_query_missing) {
// Common error, so logging explicitly and eliding Status's stack trace.
string err =
strings::Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(query_id));
VLOG(1) << err;
return Status::Expected(err);
}
RETURN_IF_ERROR(CheckProfileAccess(user, effective_user, user_has_profile_access));
if (was_retried != nullptr && *was_retried) {
DCHECK(original_result != nullptr);
// 'result' returns the latest summary so it's the retried one.
*result = retried_exec_summary;
*original_result = exec_summary;
} else {
*result = exec_summary;
}
}
return Status::OK();
}
[[noreturn]] void ImpalaServer::LogFileFlushThread() {
while (true) {
sleep(5);
const Status status = profile_logger_->Flush();
if (!status.ok()) {
LOG(WARNING) << "Error flushing profile log: " << status.GetDetail();
}
}
}
[[noreturn]] void ImpalaServer::AuditEventLoggerFlushThread() {
while (true) {
sleep(5);
Status status = audit_event_logger_->Flush();
if (!status.ok()) {
LOG(ERROR) << "Error flushing audit event log: " << status.GetDetail();
if (FLAGS_abort_on_failed_audit_event) {
CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to "
"abort_on_failed_audit_event=true");
}
}
}
}
[[noreturn]] void ImpalaServer::LineageLoggerFlushThread() {
while (true) {
sleep(5);
Status status = lineage_logger_->Flush();
if (!status.ok()) {
LOG(ERROR) << "Error flushing lineage event log: " << status.GetDetail();
if (FLAGS_abort_on_failed_lineage_event) {
CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to "
"abort_on_failed_lineage_event=true");
}
}
}
}
void ImpalaServer::ArchiveQuery(const QueryHandle& query_handle) {
vector<uint8_t> compressed_profile;
Status status = query_handle->profile()->Compress(&compressed_profile);
if (!status.ok()) {
// Didn't serialize the string. Continue with empty string.
LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
<< status.GetDetail();
return;
}
// If there was an error initialising archival (e.g. directory is not writeable),
// FLAGS_log_query_to_file will have been set to false
if (FLAGS_log_query_to_file) {
stringstream ss;
ss << UnixMillis() << " " << PrintId(query_handle->query_id()) << " ";
Base64Encode(compressed_profile, &ss);
status = profile_logger_->AppendEntry(ss.str());
if (!status.ok()) {
LOG_EVERY_N(WARNING, 1000) << "Could not write to profile log file file ("
<< google::COUNTER << " attempts failed): "
<< status.GetDetail();
LOG_EVERY_N(WARNING, 1000)
<< "Disable query logging with --log_query_to_file=false";
}
}
// 'fetch_rows_lock()' protects several fields in ClientRequestState that are read
// during QueryStateRecord creation. There should be no contention on this lock because
// the query has already been closed (e.g. no more results can be fetched).
shared_ptr<QueryStateRecord> record = nullptr;
{
lock_guard<mutex> l(*query_handle->fetch_rows_lock());
record = make_shared<QueryStateRecord>(*query_handle, move(compressed_profile));
}
if (query_handle->GetCoordinator() != nullptr) {
query_handle->GetCoordinator()->GetTExecSummary(&record->exec_summary);
}
EnqueueCompletedQuery(query_handle, record);
if (FLAGS_query_log_size != 0 && FLAGS_query_log_size_in_bytes != 0) {
int64_t record_size = EstimateSize(record.get());
VLOG(3) << "QueryStateRecord of " << PrintId(query_handle->query_id()) << " is "
<< record_size << " bytes";
lock_guard<mutex> l(query_log_lock_);
// Add record to the beginning of the log, and to the lookup index.
ImpaladMetrics::QUERY_LOG_EST_TOTAL_BYTES->Increment(record_size);
query_log_est_sizes_.push_front(record_size);
query_log_.push_front(move(record));
query_log_index_[query_handle->query_id()] = &query_log_.front();
while (!query_log_.empty()
&& ((FLAGS_query_log_size > -1 && FLAGS_query_log_size < query_log_.size())
|| (FLAGS_query_log_size_in_bytes > -1
&& FLAGS_query_log_size_in_bytes
< ImpaladMetrics::QUERY_LOG_EST_TOTAL_BYTES->GetValue()))) {
query_log_index_.erase(query_log_.back()->id);
ImpaladMetrics::QUERY_LOG_EST_TOTAL_BYTES->Increment(-query_log_est_sizes_.back());
query_log_est_sizes_.pop_back();
query_log_.pop_back();
DCHECK_GE(ImpaladMetrics::QUERY_LOG_EST_TOTAL_BYTES->GetValue(), 0);
DCHECK_EQ(query_log_.size(), query_log_est_sizes_.size());
}
}
}
ImpalaServer::~ImpalaServer() {}
void ImpalaServer::AddPoolConfiguration(TQueryCtx* ctx,
const QueryOptionsMask& override_options_mask) {
// Errors are not returned and are only logged (at level 2) because some incoming
// requests are not expected to be mapped to a pool and will not have query options,
// e.g. 'use [db];'. For requests that do need to be mapped to a pool successfully, the
// pool is resolved again during scheduling and errors are handled at that point.
string resolved_pool;
Status status = exec_env_->request_pool_service()->ResolveRequestPool(*ctx,
&resolved_pool);
if (!status.ok()) {
VLOG_RPC << "Not adding pool query options for query=" << PrintId(ctx->query_id)
<< " ResolveRequestPool status: " << status.GetDetail();
return;
}
ctx->__set_request_pool(resolved_pool);
TPoolConfig config;
status = exec_env_->request_pool_service()->GetPoolConfig(resolved_pool, &config);
if (!status.ok()) {
VLOG_RPC << "Not adding pool query options for query=" << PrintId(ctx->query_id)
<< " GetConfigPool status: " << status.GetDetail();
return;
}
TQueryOptions pool_options;
QueryOptionsMask set_pool_options_mask;
status = ParseQueryOptions(config.default_query_options, &pool_options,
&set_pool_options_mask);
if (!status.ok()) {
VLOG_QUERY << "Ignoring errors while parsing default query options for pool="
<< resolved_pool << ", message: " << status.GetDetail();
}
QueryOptionsMask overlay_mask = override_options_mask & set_pool_options_mask;
VLOG_RPC << "Parsed pool options: " << DebugQueryOptions(pool_options)
<< " override_options_mask=" << override_options_mask.to_string()
<< " set_pool_mask=" << set_pool_options_mask.to_string()
<< " overlay_mask=" << overlay_mask.to_string();
OverlayQueryOptions(pool_options, overlay_mask, &ctx->client_request.query_options);
// Enforce the max mt_dop after the defaults and overlays have already been done.
EnforceMaxMtDop(ctx, config.max_mt_dop);
status = ValidateQueryOptions(&pool_options);
if (!status.ok()) {
VLOG_QUERY << "Ignoring errors while validating default query options for pool="
<< resolved_pool << ", message: " << status.GetDetail();
}
}
void ImpalaServer::EnforceMaxMtDop(TQueryCtx* query_ctx, int64_t max_mt_dop) {
TQueryOptions& query_options = query_ctx->client_request.query_options;
// The mt_dop is overridden if all three conditions are met:
// 1. There is a nonnegative max mt_dop setting
// 2. The mt_dop query option is set
// 3. The specified mt_dop is larger than the max mt_dop setting
if (max_mt_dop >= 0 && query_options.__isset.mt_dop &&
max_mt_dop < query_options.mt_dop) {
query_ctx->__set_overridden_mt_dop_value(query_options.mt_dop);
query_options.__set_mt_dop(max_mt_dop);
}
}
Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> session_state,
QueryHandle* query_handle, const TExecRequest* external_exec_request,
const bool include_in_query_log) {
PrepareQueryContext(query_ctx);
ScopedThreadContext debug_ctx(GetThreadDebugInfo(), query_ctx->query_id);
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
// Redact the SQL stmt and update the query context
string stmt = replace_all_copy(query_ctx->client_request.stmt, "\n", " ");
Redact(&stmt);
query_ctx->client_request.__set_redacted_stmt((const string) stmt);
bool registered_query = false;
Status status = ExecuteInternal(*query_ctx, external_exec_request, session_state,
&registered_query, query_handle);
query_handle->query_driver()->IncludeInQueryLog(include_in_query_log);
if (!status.ok() && registered_query) {
UnregisterQueryDiscardResult((*query_handle)->query_id(), false, &status);
}
return status;
}
Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
const TExecRequest* external_exec_request, shared_ptr<SessionState> session_state,
bool* registered_query, QueryHandle* query_handle) {
DCHECK(session_state != nullptr);
DCHECK(query_handle != nullptr);
DCHECK(registered_query != nullptr);
*registered_query = false;
// Create the QueryDriver for this query. CreateNewDriver creates the associated
// ClientRequestState as well.
QueryDriver::CreateNewDriver(this, query_handle, query_ctx, session_state);
bool is_external_req = external_exec_request != nullptr;
if (is_external_req && external_exec_request->remote_submit_time) {
(*query_handle)->SetRemoteSubmitTime(external_exec_request->remote_submit_time);
}
(*query_handle)->query_events()->MarkEvent("Query submitted");
{
// Keep a lock on query_handle so that registration and setting
// result_metadata are atomic.
lock_guard<mutex> l(*(*query_handle)->lock());
// register exec state as early as possible so that queries that
// take a long time to plan show up, and to handle incoming status
// reports before execution starts.
RETURN_IF_ERROR(RegisterQuery(query_ctx.query_id, session_state, query_handle));
*registered_query = true;
#ifndef NDEBUG
// Inject a sleep to simulate metadata loading pauses for tables. This
// is only used for testing.
if (FLAGS_stress_metadata_loading_pause_injection_ms > 0) {
SleepForMs(FLAGS_stress_metadata_loading_pause_injection_ms);
}
#endif
size_t statement_length = query_ctx.client_request.stmt.length();
int32_t max_statement_length =
query_ctx.client_request.query_options.max_statement_length_bytes;
if (max_statement_length > 0 && statement_length > max_statement_length) {
return Status(ErrorMsg(TErrorCode::MAX_STATEMENT_LENGTH_EXCEEDED,
statement_length, max_statement_length));
}
if (is_external_req) {
// Use passed in exec_request
RETURN_IF_ERROR(query_handle->query_driver()->SetExternalPlan(
query_ctx, *external_exec_request));
if(external_exec_request->query_exec_request.query_ctx.transaction_id > 0) {
RETURN_IF_ERROR(exec_env_->frontend()->addTransaction(
external_exec_request->query_exec_request.query_ctx));
}
} else {
// Generate TExecRequest here if one was not passed in
RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx));
}
const TExecRequest& result = (*query_handle)->exec_request();
// If this is an external request, planning was done by the external frontend
if (!is_external_req) {
(*query_handle)->query_events()->MarkEvent("Planning finished");
}
(*query_handle)->SetPlanningDone();
(*query_handle)->set_user_profile_access(result.user_has_profile_access);
(*query_handle)->summary_profile()->AddEventSequence(
result.timeline.name, result.timeline);
(*query_handle)->SetFrontendProfile(result);
if (result.__isset.result_set_metadata) {
(*query_handle)->set_result_metadata(result.result_set_metadata);
}
}
VLOG(2) << "Execution request: "
<< ThriftDebugString((*query_handle)->exec_request());
// start execution of query; also starts fragment status reports
RETURN_IF_ERROR((*query_handle)->Exec());
Status status = UpdateCatalogMetrics();
if (!status.ok()) {
VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
}
return Status::OK();
}
void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
PrepareQueryContext(exec_env_->configured_backend_address().hostname,
exec_env_->krpc_address(), query_ctx);
}
void ImpalaServer::PrepareQueryContext(const std::string& hostname,
const NetworkAddressPB& krpc_addr, TQueryCtx* query_ctx) {
query_ctx->__set_pid(getpid());
int64_t now_us = UnixMicros();
const Timezone& utc_tz = TimezoneDatabase::GetUtcTimezone();
// Fill in query options with default timezone so it is visible in "SET" command,
// profiles, etc.
if (query_ctx->client_request.query_options.timezone.empty()) {
query_ctx->client_request.query_options.timezone = TimezoneDatabase::LocalZoneName();
}
string local_tz_name = query_ctx->client_request.query_options.timezone;
const Timezone* local_tz = TimezoneDatabase::FindTimezone(local_tz_name);
if (local_tz != nullptr) {
LOG(INFO) << "Found local timezone \"" << local_tz_name << "\".";
} else {
LOG(ERROR) << "Failed to find local timezone \"" << local_tz_name
<< "\". Falling back to UTC";
local_tz_name = "UTC";
local_tz = &utc_tz;
}
query_ctx->__set_utc_timestamp_string(ToStringFromUnixMicros(now_us, utc_tz));
if (query_ctx->client_request.query_options.now_string.empty()) {
query_ctx->__set_now_string(ToStringFromUnixMicros(now_us, *local_tz));
} else {
// For testing purposes
query_ctx->__set_now_string(query_ctx->client_request.query_options.now_string);
}
query_ctx->__set_start_unix_millis(now_us / MICROS_PER_MILLI);
query_ctx->__set_coord_hostname(hostname);
query_ctx->__set_coord_ip_address(FromNetworkAddressPB(krpc_addr));
TUniqueId backend_id;
UniqueIdPBToTUniqueId(ExecEnv::GetInstance()->backend_id(), &backend_id);
query_ctx->__set_coord_backend_id(backend_id);
query_ctx->__set_local_time_zone(local_tz_name);
query_ctx->__set_status_report_interval_ms(FLAGS_status_report_interval_ms);
query_ctx->__set_status_report_max_retry_s(FLAGS_status_report_max_retry_s);
query_ctx->__set_gen_aggregated_profile(FLAGS_gen_experimental_profile);
// Creating a random_generator every time is not free, but
// benchmarks show it to be slightly cheaper than contending for a
// single generator under a lock (since random_generator is not
// thread-safe).
// TODO: as cleanup we should consolidate this with uuid_generator_ - there's no reason
// to have two different methods to achieve the same end. To address the scalability
// concern we could shard the RNG or similar.
query_ctx->query_id = UuidToQueryId(random_generator()());
GetThreadDebugInfo()->SetQueryId(query_ctx->query_id);
const double trace_ratio = query_ctx->client_request.query_options.resource_trace_ratio;
if (trace_ratio > 0 && rng_.NextDoubleFraction() < trace_ratio) {
query_ctx->__set_trace_resource_usage(true);
}
}
Status ImpalaServer::RegisterQuery(const TUniqueId& query_id,
shared_ptr<SessionState> session_state, QueryHandle* query_handle) {
lock_guard<mutex> l2(session_state->lock);
// The session wasn't expired at the time it was checked out and it isn't allowed to
// expire while checked out, so it must not be expired.
DCHECK_GT(session_state->ref_count, 0);
DCHECK(!session_state->expired);
// The session may have been closed after it was checked out.
if (session_state->closed) {
VLOG(1) << "RegisterQuery(): session has been closed, ignoring query.";
return Status::Expected("Session has been closed, ignoring query.");
}
DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
RETURN_IF_ERROR(query_driver_map_.Add(query_id, query_handle->query_driver()));
// Metric is decremented in UnregisterQuery().
ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(1L);
VLOG_QUERY << "Registered query query_id=" << PrintId(query_id) << " session_id="
<< PrintId(session_state->session_id);
return Status::OK();
}
static inline int32_t GetIdleTimeout(const TQueryOptions& query_options) {
int32_t idle_timeout_s = query_options.query_timeout_s;
if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
return min(FLAGS_idle_query_timeout, idle_timeout_s);
} else {
// Use a non-zero timeout, if one exists
return max(FLAGS_idle_query_timeout, idle_timeout_s);
}
}
Status ImpalaServer::SetQueryInflight(
shared_ptr<SessionState> session_state, const QueryHandle& query_handle) {
DebugActionNoFail(query_handle->query_options(), "SET_QUERY_INFLIGHT");
const TUniqueId& query_id = query_handle->query_id();
lock_guard<mutex> l(session_state->lock);
// The session wasn't expired at the time it was checked out and it isn't allowed to
// expire while checked out, so it must not be expired.
DCHECK_GT(session_state->ref_count, 0);
DCHECK(!session_state->expired);
// The session may have been closed after it was checked out.
if (session_state->closed) {
VLOG(1) << "Session closed: cannot set " << PrintId(query_id) << " in-flight";
return Status::Expected("Session closed");
}
// Acknowledge the query by incrementing total_queries.
++session_state->total_queries;
// If the query was already closed - only possible by query retry logic - skip
// scheduling it to be unregistered with the session and adding timeouts checks.
if (session_state->prestopped_queries.erase(query_id) > 0) {
VLOG_QUERY << "Query " << PrintId(query_id) << " closed, skipping in-flight.";
return Status::OK();
}
// Add query to the set that will be unregistered if session is closed.
auto inflight_it = session_state->inflight_queries.insert(query_id);
if (UNLIKELY(!inflight_it.second)) {
LOG(WARNING) << "Query " << PrintId(query_id) << " is already in-flight.";
DCHECK(false) << "SetQueryInflight called twice for query_id=" << PrintId(query_id);
}
// If the query has a timeout or time limit, schedule checks.
int32_t idle_timeout_s = GetIdleTimeout(query_handle->query_options());
int32_t exec_time_limit_s = query_handle->query_options().exec_time_limit_s;
int64_t cpu_limit_s = query_handle->query_options().cpu_limit_s;
int64_t scan_bytes_limit = query_handle->query_options().scan_bytes_limit;
int64_t join_rows_produced_limit =
query_handle->query_options().join_rows_produced_limit;
if (idle_timeout_s > 0 || exec_time_limit_s > 0 || cpu_limit_s > 0
|| scan_bytes_limit > 0 || join_rows_produced_limit > 0) {
lock_guard<mutex> l2(query_expiration_lock_);
int64_t now = UnixMillis();
if (idle_timeout_s > 0) {
VLOG_QUERY << "Query " << PrintId(query_id) << " has idle timeout of "
<< PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S);
queries_by_timestamp_.emplace(ExpirationEvent{
now + (1000L * idle_timeout_s), query_id, ExpirationKind::IDLE_TIMEOUT});
}
if (exec_time_limit_s > 0) {
VLOG_QUERY << "Query " << PrintId(query_id) << " has execution time limit of "
<< PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S);
queries_by_timestamp_.emplace(ExpirationEvent{
now + (1000L * exec_time_limit_s), query_id, ExpirationKind::EXEC_TIME_LIMIT});
}
if (cpu_limit_s > 0 || scan_bytes_limit > 0 || join_rows_produced_limit > 0) {
if (cpu_limit_s > 0) {
VLOG_QUERY << "Query " << PrintId(query_id) << " has CPU limit of "
<< PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S);
}
if (scan_bytes_limit > 0) {
VLOG_QUERY << "Query " << PrintId(query_id) << " has scan bytes limit of "
<< PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES);
}
if (join_rows_produced_limit > 0) {
VLOG_QUERY << "Query " << PrintId(query_id) << " has join rows produced limit of "
<< PrettyPrinter::Print(join_rows_produced_limit, TUnit::UNIT);
}
queries_by_timestamp_.emplace(ExpirationEvent{
now + EXPIRATION_CHECK_INTERVAL_MS, query_id, ExpirationKind::RESOURCE_LIMIT});
}
}
return Status::OK();
}
void ImpalaServer::UpdateExecSummary(const QueryHandle& query_handle) const {
DCHECK(query_handle->GetCoordinator() != nullptr);
TExecSummary t_exec_summary;
query_handle->GetCoordinator()->GetTExecSummary(&t_exec_summary);
query_handle->summary_profile()->SetTExecSummary(t_exec_summary);
string exec_summary = PrintExecSummary(t_exec_summary);
query_handle->summary_profile()->AddInfoStringRedacted("ExecSummary", exec_summary);
query_handle->summary_profile()->AddInfoStringRedacted("Errors",
query_handle->GetCoordinator()->GetErrorLog());
}
Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
const Status* cause) {
VLOG_QUERY << "UnregisterQuery(): query_id=" << PrintId(query_id);
QueryHandle query_handle;
RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
if (check_inflight) {
DebugActionNoFail(query_handle->query_options(), "FINALIZE_INFLIGHT_QUERY");
}
// Do the work of unregistration that needs to be done synchronously. Once
// Finalize() returns, the query is considered unregistered from the client's point of
// view. If Finalize() returns OK, this thread is responsible for doing the
// unregistration work. Finalize() succeeds for the first thread to call it to avoid
// multiple threads unregistering.
RETURN_IF_ERROR(
query_handle.query_driver()->Finalize(&query_handle, check_inflight, cause));
// Do the rest of the unregistration work in the background so that the client does
// not need to wait for profile serialization, etc.
unreg_thread_pool_->Offer(move(query_handle));
return Status::OK();
}
void ImpalaServer::FinishUnregisterQuery(const QueryHandle& query_handle) {
DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
// Do all the finalization before removing the QueryDriver from the map so that
// concurrent operations, e.g. GetRuntimeProfile() can find the query.
CloseClientRequestState(query_handle);
DebugActionNoFail(query_handle->query_options(), "CLOSED_NOT_UNREGISTERED");
// Make the QueryDriver inaccessible. There is a time window where the query is
// both in 'query_driver_map_' and 'query_locations_'.
Status status = query_handle.query_driver()->Unregister(&query_driver_map_);
string err_msg = "QueryDriver can only be deleted once: " + status.GetDetail();
DCHECK(status.ok()) << err_msg;
if (UNLIKELY(!status.ok())) {
LOG(ERROR) << status.GetDetail();
} else {
VLOG_QUERY << "Query successfully unregistered: query_id="
<< PrintId(query_handle->query_id());
}
}
void ImpalaServer::UnregisterQueryDiscardResult(
const TUniqueId& query_id, bool check_inflight, const Status* cause) {
Status status = UnregisterQuery(query_id, check_inflight, cause);
if (!status.ok()) {
LOG(ERROR) << Substitute("Query de-registration for query_id=$0 failed: $1",
PrintId(query_id), cause->GetDetail());
}
}
void ImpalaServer::CloseClientRequestState(const QueryHandle& query_handle) {
int64_t duration_us = query_handle->end_time_us() - query_handle->start_time_us();
int64_t duration_ms = duration_us / MICROS_PER_MILLI;
// duration_ms can be negative when the local timezone changes during query execution.
if (duration_ms >= 0) {
if (query_handle->stmt_type() == TStmtType::DDL) {
ImpaladMetrics::DDL_DURATIONS->Update(duration_ms);
} else {
ImpaladMetrics::QUERY_DURATIONS->Update(duration_ms);
}
}
// Final attempt to capture completed RPC stats
query_handle->UnRegisterCompletedRPCs();
// Unregister any remaining RPC and discard stats
query_handle->UnRegisterRemainingRPCs();
{
lock_guard<mutex> l(query_handle->session()->lock);
if (query_handle->session()->inflight_queries.erase(query_handle->query_id()) == 0
&& query_handle->IsSetRetriedId()) {
// Closing a ClientRequestState but the query is retrying with a new state and the
// original ID was not yet inflight: skip adding it later. We don't want to track
// other scenarios because they happen when the query was started and then errored
// before a call to SetQueryInflight; we don't expect it to ever be called.
auto prestopped_it =
query_handle->session()->prestopped_queries.insert(query_handle->query_id());
if (UNLIKELY(!prestopped_it.second)) {
LOG(WARNING) << "Query " << PrintId(query_handle->query_id())
<< " closed again before in-flight.";
DCHECK(false) << "CloseClientRequestState called twice for query_id="
<< PrintId(query_handle->query_id());
} else {
VLOG_QUERY << "Query " << PrintId(query_handle->query_id())
<< " closed before in-flight.";
}
}
}
if (query_handle->GetCoordinator() != nullptr) {
UpdateExecSummary(query_handle);
}
if (query_handle->schedule() != nullptr) {
const RepeatedPtrField<BackendExecParamsPB>& backend_exec_params =
query_handle->schedule()->backend_exec_params();
if (!backend_exec_params.empty()) {
lock_guard<mutex> l(query_locations_lock_);
for (const BackendExecParamsPB& param : backend_exec_params) {
// Query may have been removed already by cancellation path. In particular, if
// node to fail was last sender to an exchange, the coordinator will realise and
// fail the query at the same time the failure detection path does the same
// thing. They will harmlessly race to remove the query from this map.
auto it = query_locations_.find(param.backend_id());
if (it != query_locations_.end()) {
it->second.query_ids.erase(query_handle->query_id());
}
}
}
}
ArchiveQuery(query_handle);
ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(-1L);
}
Status ImpalaServer::UpdateCatalogMetrics() {
TGetCatalogMetricsResult metrics;
RETURN_IF_ERROR(exec_env_->frontend()->GetCatalogMetrics(&metrics));
ImpaladMetrics::CATALOG_NUM_DBS->SetValue(metrics.num_dbs);
ImpaladMetrics::CATALOG_NUM_TABLES->SetValue(metrics.num_tables);
if (!FLAGS_use_local_catalog) return Status::OK();
DCHECK(metrics.__isset.cache_eviction_count);
DCHECK(metrics.__isset.cache_hit_count);
DCHECK(metrics.__isset.cache_load_count);
DCHECK(metrics.__isset.cache_load_exception_count);
DCHECK(metrics.__isset.cache_load_success_count);
DCHECK(metrics.__isset.cache_miss_count);
DCHECK(metrics.__isset.cache_request_count);
DCHECK(metrics.__isset.cache_total_load_time);
DCHECK(metrics.__isset.cache_avg_load_time);
DCHECK(metrics.__isset.cache_hit_rate);
DCHECK(metrics.__isset.cache_load_exception_rate);
DCHECK(metrics.__isset.cache_miss_rate);
DCHECK(metrics.__isset.cache_entry_median_size);
DCHECK(metrics.__isset.cache_entry_99th_size);
ImpaladMetrics::CATALOG_CACHE_EVICTION_COUNT->SetValue(metrics.cache_eviction_count);
ImpaladMetrics::CATALOG_CACHE_HIT_COUNT->SetValue(metrics.cache_hit_count);
ImpaladMetrics::CATALOG_CACHE_LOAD_COUNT->SetValue(metrics.cache_load_count);
ImpaladMetrics::CATALOG_CACHE_LOAD_EXCEPTION_COUNT->SetValue(
metrics.cache_load_exception_count);
ImpaladMetrics::CATALOG_CACHE_LOAD_SUCCESS_COUNT->SetValue(
metrics.cache_load_success_count);
ImpaladMetrics::CATALOG_CACHE_MISS_COUNT->SetValue(metrics.cache_miss_count);
ImpaladMetrics::CATALOG_CACHE_REQUEST_COUNT->SetValue(metrics.cache_request_count);
ImpaladMetrics::CATALOG_CACHE_TOTAL_LOAD_TIME->SetValue(metrics.cache_total_load_time);
ImpaladMetrics::CATALOG_CACHE_AVG_LOAD_TIME->SetValue(metrics.cache_avg_load_time);
ImpaladMetrics::CATALOG_CACHE_HIT_RATE->SetValue(metrics.cache_hit_rate);
ImpaladMetrics::CATALOG_CACHE_LOAD_EXCEPTION_RATE->SetValue(
metrics.cache_load_exception_rate);
ImpaladMetrics::CATALOG_CACHE_MISS_RATE->SetValue(metrics.cache_miss_rate);
ImpaladMetrics::CATALOG_CACHE_ENTRY_MEDIAN_SIZE->SetValue(
metrics.cache_entry_median_size);
ImpaladMetrics::CATALOG_CACHE_ENTRY_99TH_SIZE->SetValue(metrics.cache_entry_99th_size);
return Status::OK();
}
shared_ptr<QueryDriver> ImpalaServer::GetQueryDriver(
const TUniqueId& query_id, bool return_unregistered) {
DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
ScopedShardedMapRef<shared_ptr<QueryDriver>> map_ref(query_id, &query_driver_map_);
DCHECK(map_ref.get() != nullptr);
auto entry = map_ref->find(query_id);
if (entry == map_ref->end()) return shared_ptr<QueryDriver>();
// This started_unregister() check can race with unregistration. It cannot prevent
// unregistration starting immediately after the value is loaded. This check, however,
// is sufficient to ensure that after a client operation has unregistered the request,
// subsequent operations won't spuriously find the request.
if (!return_unregistered && entry->second->finalized()) {
return shared_ptr<QueryDriver>();
}
return entry->second;
}
Status ImpalaServer::GetActiveQueryHandle(
const TUniqueId& query_id, QueryHandle* query_handle) {
DCHECK(query_handle != nullptr);
shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id);
if (UNLIKELY(query_driver == nullptr)) {
{
lock_guard<mutex> l(idle_query_statuses_lock_);
auto it = idle_query_statuses_.find(query_id);
if (it != idle_query_statuses_.end()) {
return it->second;
}
}
Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
VLOG(1) << err.GetDetail();
return err;
}
query_handle->SetHandle(query_driver, query_driver->GetActiveClientRequestState());
// Update RPC Stats before every call. This is done here to minimize the
// pending set size and keep the profile updated while the query is executing.
(*query_handle)->UnRegisterCompletedRPCs();
(*query_handle)->RegisterRPC();
return Status::OK();
}
Status ImpalaServer::GetQueryHandle(
const TUniqueId& query_id, QueryHandle* query_handle, bool return_unregistered) {
DCHECK(query_handle != nullptr);
shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id, return_unregistered);
if (UNLIKELY(query_driver == nullptr)) {
Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
VLOG(1) << err.GetDetail();
return err;
}
ClientRequestState* request_state = query_driver->GetClientRequestState(query_id);
if (UNLIKELY(request_state == nullptr)) {
Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
VLOG(1) << err.GetDetail();
return err;
}
query_handle->SetHandle(query_driver, request_state);
return Status::OK();
}
Status ImpalaServer::GetAllQueryHandles(const TUniqueId& query_id,
QueryHandle* active_query_handle, QueryHandle* original_query_handle,
bool return_unregistered) {
DCHECK(active_query_handle != nullptr);
DCHECK(original_query_handle != nullptr);
shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id, return_unregistered);
if (UNLIKELY(query_driver == nullptr)) {
Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
VLOG(1) << err.GetDetail();
return err;
}
active_query_handle->SetHandle(query_driver,
query_driver->GetActiveClientRequestState());
original_query_handle->SetHandle(query_driver,
query_driver->GetClientRequestState(query_id));
return Status::OK();
}
Status ImpalaServer::CancelInternal(const TUniqueId& query_id) {
VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
QueryHandle query_handle;
RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
if (!query_handle->is_inflight()) {
// Error if the query is not yet inflight as we have no way to cleanly cancel it.
return Status("Query not yet running");
}
query_handle->Cancel(/*cause=*/ nullptr);
return Status::OK();
}
Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
const SecretArg& secret, bool ignore_if_absent) {
DCHECK(secret.is_session_secret());
VLOG_QUERY << "Closing session: " << PrintId(session_id);
// Find the session_state and remove it from the map.
shared_ptr<SessionState> session_state;
{
lock_guard<mutex> l(session_state_map_lock_);
SessionStateMap::iterator entry = session_state_map_.find(session_id);
if (entry == session_state_map_.end() || !secret.Validate(entry->second->secret)) {
if (ignore_if_absent) {
return Status::OK();
} else {
if (entry != session_state_map_.end()) {
// Log invalid attempts to connect. Be careful not to log secret.
VLOG(1) << "Client tried to connect to session " << PrintId(session_id)
<< " with invalid secret.";
}
string err_msg = Substitute("Invalid session id: $0", PrintId(session_id));
VLOG(1) << "CloseSessionInternal(): " << err_msg;
return Status::Expected(err_msg);
}
}
session_state = entry->second;
session_state_map_.erase(session_id);
}
DCHECK(session_state != nullptr);
if (session_state->session_type == TSessionType::BEESWAX) {
ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(-1L);
} else {
ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(-1L);
DecrementSessionCount(session_state->connected_user);
}
unordered_set<TUniqueId> inflight_queries;
vector<TUniqueId> idled_queries;
{
lock_guard<mutex> l(session_state->lock);
DCHECK(!session_state->closed);
session_state->closed = true;
// Since closed is true, no more queries will be added to the inflight list.
inflight_queries.insert(session_state->inflight_queries.begin(),
session_state->inflight_queries.end());
idled_queries.swap(session_state->idled_queries);
}
// Unregister all open queries from this session.
Status status = Status::Expected("Session closed");
for (const TUniqueId& query_id: inflight_queries) {
// TODO: deal with an error status
UnregisterQueryDiscardResult(query_id, false, &status);
}
{
lock_guard<mutex> l(idle_query_statuses_lock_);
for (const TUniqueId& query_id: idled_queries) {
idle_query_statuses_.erase(query_id);
}
}
// Reconfigure the poll period of session_maintenance_thread_ if necessary.
UnregisterSessionTimeout(session_state->session_timeout);
VLOG_QUERY << "Closed session: " << PrintId(session_id)
<< ", client address: "
<< "<" << TNetworkAddressToString(session_state->network_address) << ">.";
return Status::OK();
}
Status ImpalaServer::GetSessionState(const TUniqueId& session_id, const SecretArg& secret,
shared_ptr<SessionState>* session_state, bool mark_active) {
lock_guard<mutex> l(session_state_map_lock_);
SessionStateMap::iterator i = session_state_map_.find(session_id);
// TODO: consider factoring out the lookup and secret validation into a separate method.
// This would require rethinking the locking protocol for 'session_state_map_lock_' -
// it probably doesn't not need to be held for the full duration of this function.
if (i == session_state_map_.end() || !secret.Validate(i->second->secret)) {
if (i != session_state_map_.end()) {
// Log invalid attempts to connect. Be careful not to log secret.
VLOG(1) << "Client tried to connect to session " << PrintId(session_id)
<< " with invalid "
<< (secret.is_session_secret() ? "session" : "operation") << " secret.";
}
*session_state = shared_ptr<SessionState>();
string err_msg = secret.is_session_secret() ?
Substitute("Invalid session id: $0", PrintId(session_id)) :
Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(secret.query_id()));
VLOG(1) << "GetSessionState(): " << err_msg;
return Status::Expected(err_msg);
} else {
if (mark_active) {
lock_guard<mutex> session_lock(i->second->lock);
if (i->second->expired) {
stringstream ss;
ss << "Client session expired due to more than " << i->second->session_timeout
<< "s of inactivity (last activity was at: "
<< ToStringFromUnixMillis(i->second->last_accessed_ms) << ").";
return Status::Expected(ss.str());
}
if (i->second->closed) {
VLOG(1) << "GetSessionState(): session " << PrintId(session_id) << " is closed.";
return Status::Expected("Session is closed");
}
++i->second->ref_count;
}
*session_state = i->second;
return Status::OK();
}
}
void ImpalaServer::InitializeConfigVariables() {
// Set idle_session_timeout here to let the SET command return the value of
// the command line option FLAGS_idle_session_timeout
default_query_options_.__set_idle_session_timeout(FLAGS_idle_session_timeout);
// The next query options used to be set with flags. Setting them in
// default_query_options_ here in order to make default_query_options
// take precedence over the legacy flags.
default_query_options_.__set_use_local_tz_for_unix_timestamp_conversions(
FLAGS_use_local_tz_for_unix_timestamp_conversions);
default_query_options_.__set_convert_legacy_hive_parquet_utc_timestamps(
FLAGS_convert_legacy_hive_parquet_utc_timestamps);
QueryOptionsMask set_query_options; // unused
Status status = ParseQueryOptions(FLAGS_default_query_options,
&default_query_options_, &set_query_options);
status.MergeStatus(ValidateQueryOptions(&default_query_options_));
if (!status.ok()) {
// Log error and exit if the default query options are invalid.
CLEAN_EXIT_WITH_ERROR(Substitute("Invalid default query options. Please check "
"-default_query_options.\n $0", status.GetDetail()));
}
LOG(INFO) << "Default query options:" << ThriftDebugString(default_query_options_);
map<string, string> string_map;
TQueryOptionsToMap(default_query_options_, &string_map);
string_map["SUPPORT_START_OVER"] = "false";
string_map["TIMEZONE"] = TimezoneDatabase::LocalZoneName();
PopulateQueryOptionLevels(&query_option_levels_);
map<string, string>::const_iterator itr = string_map.begin();
for (; itr != string_map.end(); ++itr) {
ConfigVariable option;
option.__set_key(itr->first);
option.__set_value(itr->second);
AddOptionLevelToConfig(&option, itr->first);
default_configs_.push_back(option);
}
}
void ImpalaServer::SessionState::UpdateTimeout() {
DCHECK(impala_server != nullptr);
int32_t old_timeout = session_timeout;
if (set_query_options.__isset.idle_session_timeout) {
session_timeout = set_query_options.idle_session_timeout;
} else {
session_timeout = server_default_query_options->idle_session_timeout;
}
if (old_timeout != session_timeout) {
impala_server->UnregisterSessionTimeout(old_timeout);
impala_server->RegisterSessionTimeout(session_timeout);
}
}
void ImpalaServer::AddOptionLevelToConfig(ConfigVariable* config,
const string& option_key) const {
const auto query_option_level = query_option_levels_.find(option_key);
DCHECK(query_option_level != query_option_levels_.end());
config->__set_level(query_option_level->second);
}
void ImpalaServer::SessionState::ToThrift(const TUniqueId& session_id,
TSessionState* state) {
lock_guard<mutex> l(lock);
state->session_id = session_id;
state->session_type = session_type;
state->database = database;
state->connected_user = connected_user;
// The do_as_user will only be set if delegation is enabled and the
// proxy user is authorized to delegate as this user.
if (!do_as_user.empty()) state->__set_delegated_user(do_as_user);
state->network_address = network_address;
state->__set_kudu_latest_observed_ts(kudu_latest_observed_ts);
}
TQueryOptions ImpalaServer::SessionState::QueryOptions() {
TQueryOptions ret = *server_default_query_options;
OverlayQueryOptions(set_query_options, set_query_options_mask, &ret);
return ret;
}
Status ImpalaServer::WaitForResults(const TUniqueId& query_id,
QueryHandle* query_handle, int64_t* block_on_wait_time_us,
bool* timed_out) {
// Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
// ensures that rows are ready to be fetched (e.g., Wait() opens
// ClientRequestState::output_exprs_, which are evaluated in
// ClientRequestState::FetchRows() below).
RETURN_IF_ERROR(GetActiveQueryHandle(query_id, query_handle));
BlockOnWait(*query_handle, timed_out, block_on_wait_time_us);
// After BlockOnWait returns, it is possible that the query did not time out, and that
// it was instead retried. In that case, wait until the query has been successfully
// retried and then call GetActiveQueryHandle, which should now return the
// ClientRequestState for the new query.
ClientRequestState::RetryState retry_state;
retry_state = (*query_handle)->retry_state();
if (retry_state == ClientRequestState::RetryState::RETRYING
|| retry_state == ClientRequestState::RetryState::RETRIED) {
(*query_handle)->WaitUntilRetried();
RETURN_IF_ERROR(GetActiveQueryHandle(query_id, query_handle));
// Call BlockOnWait and then DCHECK that the state is not RETRYING or RETRIED
BlockOnWait(*query_handle, timed_out, block_on_wait_time_us);
retry_state = (*query_handle)->retry_state();
DCHECK(retry_state != ClientRequestState::RetryState::RETRYING
&& retry_state != ClientRequestState::RetryState::RETRIED)
<< "Unexpected state: " << (*query_handle)->RetryStateToString(retry_state);
}
return Status::OK();
}
void ImpalaServer::BlockOnWait(QueryHandle& query_handle,
bool* timed_out, int64_t* block_on_wait_time_us) {
int64_t fetch_rows_timeout_us = query_handle->fetch_rows_timeout_us();
*timed_out =
!query_handle->BlockOnWait(fetch_rows_timeout_us, block_on_wait_time_us);
}
void ImpalaServer::CancelFromThreadPool(const CancellationWork& cancellation_work) {
const TUniqueId& query_id = cancellation_work.query_id();
QueryHandle query_handle;
Status status = GetQueryHandle(query_id, &query_handle);
// Query was already unregistered.
if (!status.ok()) {
VLOG_QUERY << "CancelFromThreadPool(): query " << PrintId(query_id)
<< " already unregistered.";
return;
}
DebugActionNoFail(query_handle->query_options(), "QUERY_CANCELLATION_THREAD");
Status error;
switch (cancellation_work.cause()) {
case CancellationWorkCause::TERMINATED_BY_SERVER:
error = cancellation_work.error();
break;
case CancellationWorkCause::BACKEND_FAILED: {
// We only want to proceed with cancellation if the backends are still in use for
// the query.
vector<NetworkAddressPB> active_backends;
Coordinator* coord = query_handle->GetCoordinator();
if (coord == nullptr) {
// Query hasn't started yet - it still will run on all backends.
active_backends = cancellation_work.failed_backends();
} else {
active_backends = coord->GetActiveBackends(cancellation_work.failed_backends());
}
if (active_backends.empty()) {
VLOG_QUERY << "CancelFromThreadPool(): all failed backends already completed for "
<< "query " << PrintId(query_id);
return;
}
stringstream msg;
for (int i = 0; i < active_backends.size(); ++i) {
msg << active_backends[i];
if (i + 1 != active_backends.size()) msg << ", ";
}
error = Status::Expected(TErrorCode::UNREACHABLE_IMPALADS, msg.str());
break;
}
default:
DCHECK(false) << static_cast<int>(cancellation_work.cause());
}
if (cancellation_work.unregister()) {
UnregisterQueryDiscardResult(cancellation_work.query_id(), true, &error);
} else {
// Retry queries that would otherwise be cancelled due to an impalad leaving the
// cluster. CancellationWorkCause::BACKEND_FAILED indicates that a backend running
// the query was removed from the cluster membership due to a statestore heartbeat
// timeout. Historically, this would cause the Coordinator to cancel all queries
// running on that backend. Now, Impala attempts to retry the queries instead of
// cancelling them.
bool was_retried = false;
if (cancellation_work.cause() == CancellationWorkCause::BACKEND_FAILED) {
query_handle.query_driver()->TryQueryRetry(&*query_handle, &error, &was_retried);
}
// If the query could not be retried, then cancel the query.
if (!was_retried) {
VLOG_QUERY << "CancelFromThreadPool(): cancelling query_id=" << PrintId(query_id);
if (query_handle->is_inflight()) {
query_handle->Cancel(&error);
} else {
VLOG_QUERY << "Query cancellation (" << PrintId(cancellation_work.query_id())
<< ") skipped as query was not running.";
}
}
}
}
Status ImpalaServer::AuthorizeProxyUser(const string& user, const string& do_as_user) {
if (user.empty()) {
const string err_msg("Unable to delegate using empty proxy username.");
VLOG(1) << err_msg;
return Status::Expected(err_msg);
} else if (do_as_user.empty()) {
const string err_msg("Unable to delegate using empty doAs username.");
VLOG(1) << err_msg;
return Status::Expected(err_msg);
}
stringstream error_msg;
error_msg << "User '" << user << "' is not authorized to delegate to '"
<< do_as_user << "'.";
if (authorized_proxy_user_config_.size() == 0 &&
authorized_proxy_group_config_.size() == 0) {
error_msg << " User/group delegation is disabled.";
string error_msg_str = error_msg.str();
VLOG(1) << error_msg_str;
return Status::Expected(error_msg_str);
}
// Get the short version of the user name (the user name up to the first '/' or '@')
// from the full principal name.
size_t end_idx = min(user.find('/'), user.find('@'));
// If neither are found (or are found at the beginning of the user name),
// return the username. Otherwise, return the username up to the matching character.
string short_user(
end_idx == string::npos || end_idx == 0 ? user : user.substr(0, end_idx));
// Check if the proxy user exists. If he/she does, then check if they are allowed
// to delegate to the do_as_user.
AuthorizedProxyMap::const_iterator proxy_user =
authorized_proxy_user_config_.find(short_user);
if (proxy_user != authorized_proxy_user_config_.end()) {
boost::unordered_set<string> users = proxy_user->second;
if (users.find("*") != users.end() ||
users.find(do_as_user) != users.end()) {
return Status::OK();
}
}
if (authorized_proxy_group_config_.size() > 0) {
// Check if the groups of do_as_user are in the authorized proxy groups.
AuthorizedProxyMap::const_iterator proxy_group =
authorized_proxy_group_config_.find(short_user);
if (proxy_group != authorized_proxy_group_config_.end()) {
boost::unordered_set<string> groups = proxy_group->second;
if (groups.find("*") != groups.end()) return Status::OK();
TGetHadoopGroupsRequest req;
req.__set_user(do_as_user);
TGetHadoopGroupsResponse res;
int64_t start = MonotonicMillis();
Status status = exec_env_->frontend()->GetHadoopGroups(req, &res);
VLOG_QUERY << "Getting Hadoop groups for user: " << short_user << " took " <<
(PrettyPrinter::Print(MonotonicMillis() - start, TUnit::TIME_MS));
if (!status.ok()) {
LOG(ERROR) << "Error getting Hadoop groups for user: " << short_user << ": "
<< status.GetDetail();
return status;
}
for (const string& do_as_group : res.groups) {
if (groups.find(do_as_group) != groups.end()) {
return Status::OK();
}
}
}
}
string error_msg_str = error_msg.str();
VLOG(1) << error_msg_str;
return Status::Expected(error_msg_str);
}
bool ImpalaServer::IsAuthorizedProxyUser(const string& user) {
if (user.empty()) return false;
// Get the short version of the user name (the user name up to the first '/' or '@')
// from the full principal name.
size_t end_idx = min(user.find('/'), user.find('@'));
// If neither are found (or are found at the beginning of the user name),
// return the username. Otherwise, return the username up to the matching character.
string short_user(
end_idx == string::npos || end_idx == 0 ? user : user.substr(0, end_idx));
return authorized_proxy_user_config_.find(short_user)
!= authorized_proxy_user_config_.end()
|| authorized_proxy_group_config_.find(short_user)
!= authorized_proxy_group_config_.end();
}
void ImpalaServer::CatalogUpdateVersionInfo::UpdateCatalogVersionMetrics()
{
ImpaladMetrics::CATALOG_VERSION->SetValue(catalog_version);
ImpaladMetrics::CATALOG_OBJECT_VERSION_LOWER_BOUND->SetValue(
catalog_object_version_lower_bound);
ImpaladMetrics::CATALOG_TOPIC_VERSION->SetValue(catalog_topic_version);
ImpaladMetrics::CATALOG_SERVICE_ID->SetValue(PrintId(catalog_service_id));
}
void ImpalaServer::CatalogUpdateCallback(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
vector<TTopicDelta>* subscriber_topic_updates) {
StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
if (topic == incoming_topic_deltas.end()) return;
const TTopicDelta& delta = topic->second;
TopicItemSpanIterator callback_ctx (delta.topic_entries, FLAGS_compact_catalog_topic);
TUpdateCatalogCacheRequest req;
req.__set_is_delta(delta.is_delta);
req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx));
TUpdateCatalogCacheResponse resp;
Status s = exec_env_->frontend()->UpdateCatalogCache(req, &resp);
if (!s.ok()) {
LOG(ERROR) << "There was an error processing the impalad catalog update. Requesting"
<< " a full topic update to recover: " << s.GetDetail();
subscriber_topic_updates->emplace_back();
TTopicDelta& update = subscriber_topic_updates->back();
update.topic_name = CatalogServer::IMPALA_CATALOG_TOPIC;
update.__set_from_version(0L);
ImpaladMetrics::CATALOG_READY->SetValue(false);
// Dropped all cached lib files (this behaves as if all functions and data
// sources are dropped).
LibCache::instance()->DropCache();
} else {
{
unique_lock<mutex> unique_lock(catalog_version_lock_);
if (catalog_update_info_.catalog_version != resp.new_catalog_version) {
LOG(INFO) << "Catalog topic update applied with version: " <<
resp.new_catalog_version << " new min catalog object version: " <<
resp.catalog_object_version_lower_bound;
}
catalog_update_info_.catalog_version = resp.new_catalog_version;
catalog_update_info_.catalog_topic_version = delta.to_version;
catalog_update_info_.catalog_service_id = resp.catalog_service_id;
catalog_update_info_.catalog_object_version_lower_bound =
resp.catalog_object_version_lower_bound;
catalog_update_info_.UpdateCatalogVersionMetrics();
}
ImpaladMetrics::CATALOG_READY->SetValue(resp.new_catalog_version > 0);
// TODO: deal with an error status
discard_result(UpdateCatalogMetrics());
}
// Always update the minimum subscriber version for the catalog topic.
{
unique_lock<mutex> unique_lock(catalog_version_lock_);
DCHECK(delta.__isset.min_subscriber_topic_version);
min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version;
}
catalog_version_update_cv_.NotifyAll();
}
void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
unique_lock<mutex> unique_lock(catalog_version_lock_);
// Wait for the update to be processed locally.
VLOG_QUERY << "Waiting for catalog version: " << catalog_update_version
<< " current version: " << catalog_update_info_.catalog_version;
while (catalog_update_info_.catalog_version < catalog_update_version &&
catalog_update_info_.catalog_service_id == catalog_service_id) {
catalog_version_update_cv_.Wait(unique_lock);
}
if (catalog_update_info_.catalog_service_id != catalog_service_id) {
timeline->MarkEvent("Detected change in catalog service ID");
VLOG_QUERY << "Detected change in catalog service ID";
} else {
timeline->MarkEvent(Substitute("Applied catalog version $0",
catalog_update_version));
VLOG_QUERY << "Applied catalog version: " << catalog_update_version;
}
}
void ImpalaServer::WaitForCatalogUpdateTopicPropagation(
const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
unique_lock<mutex> unique_lock(catalog_version_lock_);
int64_t min_req_subscriber_topic_version =
catalog_update_info_.catalog_topic_version;
VLOG_QUERY << "Waiting for min subscriber topic version: "
<< min_req_subscriber_topic_version << " current version: "
<< min_subscriber_catalog_topic_version_;
while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
catalog_update_info_.catalog_service_id == catalog_service_id) {
catalog_version_update_cv_.Wait(unique_lock);
}
if (catalog_update_info_.catalog_service_id != catalog_service_id) {
timeline->MarkEvent("Detected change in catalog service ID");
VLOG_QUERY << "Detected change in catalog service ID";
} else {
timeline->MarkEvent(Substitute("Min catalog topic version of coordinators reached $0",
min_req_subscriber_topic_version));
VLOG_QUERY << "Min catalog topic version of coordinators: "
<< min_req_subscriber_topic_version;
}
}
void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_version,
const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
unique_lock<mutex> unique_lock(catalog_version_lock_);
int64_t catalog_object_version_lower_bound =
catalog_update_info_.catalog_object_version_lower_bound;
// TODO: Set a timeout to eventually break out of this loop if something goes
// wrong?
VLOG_QUERY << "Waiting for local minimum catalog object version to be > "
<< min_req_catalog_object_version << ", current lower bound of local versions: "
<< catalog_object_version_lower_bound;
while (catalog_update_info_.catalog_service_id == catalog_service_id
&& catalog_update_info_.catalog_object_version_lower_bound <=
min_req_catalog_object_version) {
catalog_version_update_cv_.Wait(unique_lock);
}
if (catalog_update_info_.catalog_service_id != catalog_service_id) {
timeline->MarkEvent("Detected change in catalog service ID");
VLOG_QUERY << "Detected change in catalog service ID";
} else {
timeline->MarkEvent(Substitute("Local min catalog version reached $0",
min_req_catalog_object_version));
VLOG_QUERY << "Updated catalog object version lower bound: "
<< min_req_catalog_object_version;
}
}
Status ImpalaServer::ProcessCatalogUpdateResult(
const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers,
const TQueryOptions& query_options, RuntimeProfile::EventSequence* timeline) {
const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
if (!catalog_update_result.__isset.updated_catalog_objects &&
!catalog_update_result.__isset.removed_catalog_objects) {
// Operation with no result set. Use the version specified in
// 'catalog_update_result' to determine when the effects of this operation
// have been applied to the local catalog cache.
if (catalog_update_result.is_invalidate) {
WaitForMinCatalogUpdate(catalog_update_result.version, catalog_service_id,
timeline);
} else {
WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id, timeline);
}
if (wait_for_all_subscribers) {
// Now wait for this update to be propagated to all catalog topic subscribers.
// If we make it here it implies the first condition was met (the update was
// processed locally or the catalog service id has changed).
WaitForCatalogUpdateTopicPropagation(catalog_service_id, timeline);
}
} else {
TUniqueId cur_service_id;
{
Status status = DebugAction(query_options, "WAIT_BEFORE_PROCESSING_CATALOG_UPDATE");
DCHECK(status.ok());
unique_lock<mutex> ver_lock(catalog_version_lock_);
cur_service_id = catalog_update_info_.catalog_service_id;
if (cur_service_id != catalog_service_id) {
LOG(INFO) << "Catalog service ID mismatch. Current ID: "
<< PrintId(cur_service_id) << ". ID in response: "
<< PrintId(catalog_service_id) << ". Catalogd may have been restarted. "
"Waiting for new catalog update from statestore.";
WaitForNewCatalogServiceId(cur_service_id, &ver_lock);
cur_service_id = catalog_update_info_.catalog_service_id;
}
}
if (cur_service_id == catalog_service_id) {
CatalogUpdateResultIterator callback_ctx(catalog_update_result);
TUpdateCatalogCacheRequest update_req;
update_req.__set_is_delta(true);
update_req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx));
// The catalog version is updated in WaitForCatalogUpdate below. So we need a
// standalone field in the request to update the service ID without touching the
// catalog version.
update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
// Apply the changes to the local catalog cache.
TUpdateCatalogCacheResponse resp;
Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
timeline->MarkEvent("Applied catalog updates from DDL");
if (!status.ok()) LOG(ERROR) << status.GetDetail();
RETURN_IF_ERROR(status);
} else {
// We can't apply updates on another service id, because the local catalog is still
// inconsistent with the catalogd that executes the DDL/DML.
//
// 'cur_service_id' could belong to
// 1) a stale update about a previous catalogd; this is possible if
// a) catalogd was restarted more than once (for example inside a statestore
// update cycle) and we only got the updates about some but not all restarts
// - the update about the catalogd that has 'catalog_service_id' has not
// arrived yet OR
// b) we gave up waiting (timed out or got a certain number of updates) before
// getting the update about the new catalogd
// 2) an update about a restarted catalogd that is newer than the one with
// 'catalog_service_id' (in this case we also timed out waiting for an update)
//
// We are good to ignore the DDL/DML result in the second case. However, in the
// first case clients may see a stale catalog until the expected catalog topic
// update arrives.
// TODO: handle the first case in IMPALA-10875.
LOG(WARNING) << "Ignoring catalog update result of catalog service ID "
<< PrintId(catalog_service_id)
<< " because it does not match with current catalog service ID "
<< PrintId(cur_service_id)
<< ". The current catalog service ID may be stale (this may be caused by the "
<< "catalogd having been restarted more than once) or newer than the catalog "
<< "service ID of the update result.";
}
if (!wait_for_all_subscribers) return Status::OK();
// Wait until we receive and process the catalog update that covers the effects
// (catalog objects) of this operation.
WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id, timeline);
// Now wait for this update to be propagated to all catalog topic
// subscribers.
WaitForCatalogUpdateTopicPropagation(catalog_service_id, timeline);
}
return Status::OK();
}
void ImpalaServer::RegisterQueryLocations(
const RepeatedPtrField<BackendExecParamsPB>& backend_params,
const TUniqueId& query_id) {
VLOG_QUERY << "Registering query locations";
if (!backend_params.empty()) {
lock_guard<mutex> l(query_locations_lock_);
for (const BackendExecParamsPB& param : backend_params) {
const BackendIdPB& backend_id = param.backend_id();
auto it = query_locations_.find(backend_id);
if (it == query_locations_.end()) {
query_locations_.emplace(
backend_id, QueryLocationInfo(param.address(), query_id));
} else {
it->second.query_ids.insert(query_id);
}
}
}
}
void ImpalaServer::CancelQueriesOnFailedBackends(
const std::unordered_set<BackendIdPB>& current_membership) {
// Maps from query id (to be cancelled) to a list of failed Impalads that are
// the cause of the cancellation. Note that we don't need to use TBackendIds as a single
// query can't be scheduled on two backends with the same TNetworkAddress so there's no
// ambiguity, and passing the TNetworkAddresses into the CancellationWork makes them
// available for generating a user-friendly error message.
map<TUniqueId, vector<NetworkAddressPB>> queries_to_cancel;
{
// Build a list of queries that are running on failed hosts (as evidenced by their
// absence from the membership list).
lock_guard<mutex> l(query_locations_lock_);
QueryLocations::const_iterator loc_entry = query_locations_.begin();
while (loc_entry != query_locations_.end()) {
if (current_membership.find(loc_entry->first) == current_membership.end()) {
// Add failed backend locations to all queries that ran on that backend.
for (const auto& query_id : loc_entry->second.query_ids) {
queries_to_cancel[query_id].push_back(loc_entry->second.address);
}
loc_entry = query_locations_.erase(loc_entry);
} else {
++loc_entry;
}
}
}
if (cancellation_thread_pool_->GetQueueSize() + queries_to_cancel.size() >
MAX_CANCELLATION_QUEUE_SIZE) {
// Ignore the cancellations - we'll be able to process them on the next heartbeat
// instead.
LOG_EVERY_N(WARNING, 60) << "Cancellation queue is full";
} else {
// Since we are the only producer for this pool, we know that this cannot block
// indefinitely since the queue is large enough to accept all new cancellation
// requests.
for (const auto& cancellation_entry : queries_to_cancel) {
stringstream backends_ss;
for (int i = 0; i < cancellation_entry.second.size(); ++i) {
backends_ss << cancellation_entry.second[i];
if (i + 1 != cancellation_entry.second.size()) backends_ss << ", ";
}
VLOG_QUERY << "Backends failed for query " << PrintId(cancellation_entry.first)
<< ", adding to queue to check for cancellation: " << backends_ss.str();
cancellation_thread_pool_->Offer(CancellationWork::BackendFailure(
cancellation_entry.first, cancellation_entry.second));
}
}
}
std::shared_ptr<const BackendDescriptorPB> ImpalaServer::GetLocalBackendDescriptor() {
if (!AreServicesReady()) return nullptr;
lock_guard<mutex> l(local_backend_descriptor_lock_);
// Check if the current backend descriptor needs to be initialized.
if (local_backend_descriptor_.get() == nullptr) {
shared_ptr<BackendDescriptorPB> new_be_desc =
std::make_shared<BackendDescriptorPB>();
BuildLocalBackendDescriptorInternal(new_be_desc.get());
local_backend_descriptor_ = new_be_desc;
}
// Check to see if it needs to be updated.
if (IsShuttingDown() != local_backend_descriptor_->is_quiescing()) {
std::shared_ptr<BackendDescriptorPB> new_be_desc =
std::make_shared<BackendDescriptorPB>(*local_backend_descriptor_);
new_be_desc->set_is_quiescing(IsShuttingDown());
local_backend_descriptor_ = new_be_desc;
}
return local_backend_descriptor_;
}
void ImpalaServer::BuildLocalBackendDescriptorInternal(BackendDescriptorPB* be_desc) {
DCHECK(AreServicesReady());
bool is_quiescing = shutting_down_.Load() != 0;
*be_desc->mutable_backend_id() = exec_env_->backend_id();
*be_desc->mutable_address() =
MakeNetworkAddressPB(exec_env_->configured_backend_address().hostname,
exec_env_->configured_backend_address().port, exec_env_->backend_id(),
exec_env_->rpc_mgr()->GetUdsAddressUniqueId());
be_desc->set_ip_address(exec_env_->ip_address());
be_desc->set_is_coordinator(FLAGS_is_coordinator);
be_desc->set_is_executor(FLAGS_is_executor);
Webserver* webserver = ExecEnv::GetInstance()->webserver();
if (webserver != nullptr) {
*be_desc->mutable_debug_http_address() =
MakeNetworkAddressPB(webserver->hostname(), webserver->port());
be_desc->set_secure_webserver(webserver->IsSecure());
}
const NetworkAddressPB& krpc_address = exec_env_->krpc_address();
DCHECK(IsResolvedAddress(krpc_address));
*be_desc->mutable_krpc_address() = krpc_address;
be_desc->set_admit_mem_limit(exec_env_->admit_mem_limit());
be_desc->set_admission_slots(exec_env_->admission_slots());
be_desc->set_is_quiescing(is_quiescing);
SetExecutorGroups(FLAGS_executor_groups, be_desc);
if (CommonMetrics::PROCESS_START_TIME != nullptr) {
be_desc->set_process_start_time(CommonMetrics::PROCESS_START_TIME->GetValue());
} else {
be_desc->set_process_start_time(CurrentTimeString());
}
be_desc->set_version(GetBuildVersion(/* compact */ true));
}
void ImpalaServer::ConnectionStart(
const ThriftServer::ConnectionContext& connection_context) {
if (connection_context.server_name == BEESWAX_SERVER_NAME ||
connection_context.server_name == INTERNAL_SERVER_NAME) {
// Beeswax only allows for one session per connection, so we can share the session ID
// with the connection ID
const TUniqueId& session_id = connection_context.connection_id;
// Generate a secret per Beeswax session so that the HS2 secret validation mechanism
// prevent accessing of Beeswax sessions from HS2.
TUniqueId secret = RandomUniqueID();
shared_ptr<SessionState> session_state =
std::make_shared<SessionState>(this, session_id, secret);
session_state->closed = false;
session_state->start_time_ms = UnixMillis();
session_state->last_accessed_ms = UnixMillis();
session_state->database = "default";
session_state->session_timeout = FLAGS_idle_session_timeout;
session_state->session_type = TSessionType::BEESWAX;
session_state->network_address = connection_context.network_address;
session_state->server_default_query_options = &default_query_options_;
session_state->kudu_latest_observed_ts = 0;
session_state->connections.insert(connection_context.connection_id);
// If the username was set by a lower-level transport, use it.
if (!connection_context.username.empty()) {
session_state->connected_user = connection_context.username;
}
RegisterSessionTimeout(session_state->session_timeout);
{
lock_guard<mutex> l(session_state_map_lock_);
bool success =
session_state_map_.insert(make_pair(session_id, session_state)).second;
// The session should not have already existed.
DCHECK(success);
}
{
lock_guard<mutex> l(connection_to_sessions_map_lock_);
connection_to_sessions_map_[connection_context.connection_id].insert(session_id);
}
ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(1L);
}
}
void ImpalaServer::ConnectionEnd(
const ThriftServer::ConnectionContext& connection_context) {
set<TUniqueId> disconnected_sessions;
{
unique_lock<mutex> l(connection_to_sessions_map_lock_);
ConnectionToSessionMap::iterator it =
connection_to_sessions_map_.find(connection_context.connection_id);
// Not every connection must have an associated session
if (it == connection_to_sessions_map_.end()) return;
// Sessions are not removed from the map even after they are closed and an entry
// won't be added to the map unless a session is established.
DCHECK(!it->second.empty());
// We don't expect a large number of sessions per connection, so we copy it, so that
// we can drop the map lock early.
disconnected_sessions = std::move(it->second);
connection_to_sessions_map_.erase(it);
}
const string connection_id = PrintId(connection_context.connection_id);
LOG(INFO) << "Connection " << connection_id << " from client "
<< TNetworkAddressToString(connection_context.network_address)
<< " to server " << connection_context.server_name << " closed."
<< " The connection had " << disconnected_sessions.size()
<< " associated session(s).";
bool close = connection_context.server_name == BEESWAX_SERVER_NAME
|| connection_context.server_name == INTERNAL_SERVER_NAME
|| FLAGS_disconnected_session_timeout <= 0;
if (close) {
for (const TUniqueId& session_id : disconnected_sessions) {
Status status = CloseSessionInternal(session_id, SecretArg::SkipSecretCheck(),
/* ignore_if_absent= */ true);
if (!status.ok()) {
LOG(WARNING) << "Error closing session " << PrintId(session_id) << ": "
<< status.GetDetail();
}
}
} else {
DCHECK(connection_context.server_name == HS2_SERVER_NAME
|| connection_context.server_name == HS2_HTTP_SERVER_NAME
|| connection_context.server_name == EXTERNAL_FRONTEND_SERVER_NAME);
for (const TUniqueId& session_id : disconnected_sessions) {
shared_ptr<SessionState> state;
Status status = GetSessionState(session_id, SecretArg::SkipSecretCheck(), &state);
// The session may not exist if it was explicitly closed.
if (!status.ok()) continue;
lock_guard<mutex> state_lock(state->lock);
state->connections.erase(connection_context.connection_id);
if (state->connections.empty()) {
state->disconnected_ms = UnixMillis();
RegisterSessionTimeout(FLAGS_disconnected_session_timeout);
}
}
}
}
bool ImpalaServer::IsIdleConnection(
const ThriftServer::ConnectionContext& connection_context) {
// The set of sessions associated with this connection.
std::set<TUniqueId> session_ids;
{
TUniqueId connection_id = connection_context.connection_id;
unique_lock<mutex> l(connection_to_sessions_map_lock_);
ConnectionToSessionMap::iterator it = connection_to_sessions_map_.find(connection_id);
// Not every connection must have an associated session
if (it == connection_to_sessions_map_.end()) return false;
session_ids = it->second;
// Sessions are not removed from the map even after they are closed and an entry
// won't be added to the map unless a session is established. The code below relies
// on this invariant to not mark a connection with no session yet as idle.
DCHECK(!session_ids.empty());
}
// Check if all the sessions associated with the connection are idle.
{
lock_guard<mutex> map_lock(session_state_map_lock_);
for (const TUniqueId& session_id : session_ids) {
const auto it = session_state_map_.find(session_id);
if (it == session_state_map_.end()) continue;
// If any session associated with this connection is not idle,
// the connection is not idle.
lock_guard<mutex> state_lock(it->second->lock);
if (!it->second->expired) return false;
}
}
return true;
}
void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
if (session_timeout <= 0) return;
{
lock_guard<mutex> l(session_timeout_lock_);
session_timeout_set_.insert(session_timeout);
}
session_timeout_cv_.NotifyOne();
}
void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
if (session_timeout > 0) {
lock_guard<mutex> l(session_timeout_lock_);
auto itr = session_timeout_set_.find(session_timeout);
DCHECK(itr != session_timeout_set_.end());
session_timeout_set_.erase(itr);
}
}
[[noreturn]] void ImpalaServer::SessionMaintenance() {
while (true) {
{
unique_lock<mutex> timeout_lock(session_timeout_lock_);
if (session_timeout_set_.empty()) {
session_timeout_cv_.Wait(timeout_lock);
} else {
// Sleep for a second before doing maintenance.
session_timeout_cv_.WaitFor(timeout_lock, MICROS_PER_SEC);
}
}
int64_t now = UnixMillis();
int expired_cnt = 0;
VLOG(3) << "Session maintenance thread waking up";
{
// TODO: If holding session_state_map_lock_ for the duration of this loop is too
// expensive, consider a priority queue.
lock_guard<mutex> map_lock(session_state_map_lock_);
vector<TUniqueId> sessions_to_remove;
for (SessionStateMap::value_type& map_entry : session_state_map_) {
const TUniqueId& session_id = map_entry.first;
std::shared_ptr<SessionState> session_state = map_entry.second;
unordered_set<TUniqueId> inflight_queries;
Status query_cancel_status;
{
lock_guard<mutex> state_lock(session_state->lock);
if (session_state->ref_count > 0) continue;
// A session closed by other means is in the process of being removed, and it's
// best not to interfere.
if (session_state->closed) continue;
if (session_state->connections.size() == 0
&& (now - session_state->disconnected_ms)
>= FLAGS_disconnected_session_timeout * 1000L) {
// This session has no active connections and is past the disconnected session
// timeout, so close it.
DCHECK(session_state->session_type == TSessionType::HIVESERVER2 ||
session_state->session_type == TSessionType::EXTERNAL_FRONTEND);
LOG(INFO) << "Closing session: " << PrintId(session_id)
<< ", user: " << session_state->connected_user
<< ", because it no longer has any open connections. The last "
<< "connection was closed at: "
<< ToStringFromUnixMillis(session_state->disconnected_ms);
session_state->closed = true;
sessions_to_remove.push_back(session_id);
ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(-1L);
UnregisterSessionTimeout(FLAGS_disconnected_session_timeout);
query_cancel_status =
Status::Expected(TErrorCode::DISCONNECTED_SESSION_CLOSED);
DecrementSessionCount(session_state->connected_user);
} else {
// Check if the session should be expired.
if (session_state->expired || session_state->session_timeout == 0) {
continue;
}
int64_t last_accessed_ms = session_state->last_accessed_ms;
int64_t session_timeout_ms = session_state->session_timeout * 1000;
if (now - last_accessed_ms <= session_timeout_ms) continue;
LOG(INFO) << "Expiring session: " << PrintId(session_id)
<< ", user: " << session_state->connected_user
<< ", last active: " << ToStringFromUnixMillis(last_accessed_ms);
session_state->expired = true;
++expired_cnt;
ImpaladMetrics::NUM_SESSIONS_EXPIRED->Increment(1L);
query_cancel_status = Status::Expected(TErrorCode::INACTIVE_SESSION_EXPIRED);
}
// Since either expired or closed is true no more queries will be added to the
// inflight list.
inflight_queries.insert(session_state->inflight_queries.begin(),
session_state->inflight_queries.end());
}
// Unregister all open queries from this session.
for (const TUniqueId& query_id : inflight_queries) {
cancellation_thread_pool_->Offer(
CancellationWork::TerminatedByServer(query_id, query_cancel_status, true));
}
}
// Remove any sessions that were closed from the map.
for (const TUniqueId& session_id : sessions_to_remove) {
session_state_map_.erase(session_id);
}
}
LOG_IF(INFO, expired_cnt > 0) << "Expired sessions. Count: " << expired_cnt;
}
}
[[noreturn]] void ImpalaServer::ExpireQueries() {
while (true) {
// The following block accomplishes four things:
//
// 1. Update the ordered list of queries by checking the 'idle_time' parameter in
// client_request_state. We are able to avoid doing this for *every* query in flight
// thanks to the observation that expiry times never move backwards, only
// forwards. Therefore once we find a query that a) hasn't changed its idle time and
// b) has not yet expired we can stop moving through the list. If the idle time has
// changed, we need to re-insert the query in the right place in queries_by_timestamp_
//
// 2. Remove any queries that would have expired but have already been closed for any
// reason.
//
// 3. Compute the next time a query *might* expire, so that the sleep at the end of
// this loop has an accurate duration to wait. If the list of queries is empty, the
// default sleep duration is half the idle query timeout.
//
// 4. Cancel queries with CPU and scan bytes constraints if limit is exceeded
int64_t now;
{
lock_guard<mutex> l(query_expiration_lock_);
ExpirationQueue::iterator expiration_event = queries_by_timestamp_.begin();
now = UnixMillis();
while (expiration_event != queries_by_timestamp_.end()) {
// 'queries_by_timestamp_' is stored in ascending order of deadline so we can
// break out of the loop and sleep as soon as we see a deadline in the future.
if (expiration_event->deadline > now) break;
shared_ptr<QueryDriver> query_driver = GetQueryDriver(expiration_event->query_id);
if (query_driver == nullptr) {
// Query was deleted already from a previous expiration event
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
ClientRequestState* crs = query_driver->GetActiveClientRequestState();
if (crs->is_expired() && expiration_event->kind != ExpirationKind::IDLE_TIMEOUT) {
// Query was expired already from a previous expiration event. Keep idle
// timeouts as they will additionally unregister the query.
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
// Check for CPU and scanned bytes limits
if (expiration_event->kind == ExpirationKind::RESOURCE_LIMIT) {
Status resource_status = CheckResourceLimits(crs);
if (resource_status.ok()) {
queries_by_timestamp_.emplace(
ExpirationEvent{now + EXPIRATION_CHECK_INTERVAL_MS,
expiration_event->query_id, ExpirationKind::RESOURCE_LIMIT});
} else {
ExpireQuery(crs, resource_status);
}
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
// If the query time limit expired, we must cancel the query.
if (expiration_event->kind == ExpirationKind::EXEC_TIME_LIMIT) {
int32_t exec_time_limit_s = crs->query_options().exec_time_limit_s;
VLOG_QUERY << "Expiring query " << PrintId(expiration_event->query_id)
<< " due to execution time limit of " << exec_time_limit_s << "s.";
ExpireQuery(crs,
Status::Expected(TErrorCode::EXEC_TIME_LIMIT_EXCEEDED,
PrintId(expiration_event->query_id),
PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S)));
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
DCHECK(expiration_event->kind == ExpirationKind::IDLE_TIMEOUT)
<< static_cast<int>(expiration_event->kind);
// Now check to see if the idle timeout has expired. We must check the actual
// expiration time in case the query has updated 'last_active_ms' since the last
// time we looked.
int32_t idle_timeout_s = GetIdleTimeout(crs->query_options());
int64_t expiration = crs->last_active_ms() + (idle_timeout_s * 1000L);
if (now < expiration) {
// If the real expiration date is in the future we may need to re-insert the
// query's expiration event at its correct location.
if (expiration == expiration_event->deadline) {
// The query hasn't been updated since it was inserted, so we know (by the
// fact that queries are inserted in-expiration-order initially) that it is
// still the next query to expire. No need to re-insert it.
break;
} else {
// Erase and re-insert with an updated expiration time.
TUniqueId query_id = expiration_event->query_id;
expiration_event = queries_by_timestamp_.erase(expiration_event);
queries_by_timestamp_.emplace(ExpirationEvent{
expiration, query_id, ExpirationKind::IDLE_TIMEOUT});
}
} else if (!crs->is_active()) {
// Otherwise time to expire this query
VLOG_QUERY << "Expiring query due to client inactivity: "
<< PrintId(expiration_event->query_id) << ", last activity was at: "
<< ToStringFromUnixMillis(crs->last_active_ms());
const Status status = Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
PrintId(expiration_event->query_id),
PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
// Save status so we can report it for unregistered queries.
Status preserved_status;
{
lock_guard<mutex> l(*crs->lock());
preserved_status = crs->query_status();
}
preserved_status.MergeStatus(status);
{
shared_ptr<SessionState> session = crs->session();
lock_guard<mutex> l(session->lock);
if (!session->closed) {
lock_guard<mutex> l(idle_query_statuses_lock_);
idle_query_statuses_.emplace(
expiration_event->query_id, move(preserved_status));
session->idled_queries.emplace_back(expiration_event->query_id);
}
}
ExpireQuery(crs, status, true);
expiration_event = queries_by_timestamp_.erase(expiration_event);
} else {
// Iterator is moved on in every other branch.
++expiration_event;
}
}
}
// Since we only allow timeouts to be 1s or greater, the earliest that any new query
// could expire is in 1s time. An existing query may expire sooner, but we are
// comfortable with a maximum error of 1s as a trade-off for not frequently waking
// this thread.
SleepForMs(EXPIRATION_CHECK_INTERVAL_MS);
}
}
[[noreturn]] void ImpalaServer::UnresponsiveBackendThread() {
int64_t max_lag_ms = FLAGS_status_report_max_retry_s * 1000
* (1 + FLAGS_status_report_cancellation_padding / 100.0);
DCHECK_GT(max_lag_ms, 0);
VLOG(1) << "Queries will be cancelled if a backend has not reported its status in "
<< "more than " << max_lag_ms << "ms.";
while (true) {
vector<CancellationWork> to_cancel;
query_driver_map_.DoFuncForAllEntries(
[&](const std::shared_ptr<QueryDriver>& query_driver) {
ClientRequestState* request_state = query_driver->GetActiveClientRequestState();
Coordinator* coord = request_state->GetCoordinator();
if (coord != nullptr) {
NetworkAddressPB address;
int64_t lag_time_ms = coord->GetMaxBackendStateLagMs(&address);
if (lag_time_ms > max_lag_ms) {
to_cancel.push_back(
CancellationWork::TerminatedByServer(request_state->query_id(),
Status(TErrorCode::UNRESPONSIVE_BACKEND,
PrintId(request_state->query_id()),
NetworkAddressPBToString(address), lag_time_ms, max_lag_ms),
false /* unregister */));
}
}
});
// We call Offer() outside of DoFuncForAllEntries() to ensure that if the
// cancellation_thread_pool_ queue is full, we're not blocked while holding one of the
// 'query_driver_map_' shard locks.
for (auto cancellation_work : to_cancel) {
cancellation_thread_pool_->Offer(cancellation_work);
}
SleepForMs(max_lag_ms * 0.1);
}
}
[[noreturn]] void ImpalaServer::AdmissionHeartbeatThread() {
while (true) {
SleepForMs(FLAGS_admission_heartbeat_frequency_ms);
std::unique_ptr<AdmissionControlServiceProxy> proxy;
Status get_proxy_status = AdmissionControlService::GetProxy(&proxy);
if (!get_proxy_status.ok()) {
LOG(ERROR) << "Admission heartbeat thread was unable to get an "
"AdmissionControlService proxy:"
<< get_proxy_status.GetDetail();
continue;
}
AdmissionHeartbeatRequestPB request;
AdmissionHeartbeatResponsePB response;
*request.mutable_host_id() = exec_env_->backend_id();
request.set_version(++admission_heartbeat_version_);
query_driver_map_.DoFuncForAllEntries(
[&](const std::shared_ptr<QueryDriver>& query_driver) {
ClientRequestState* request_state = query_driver->GetActiveClientRequestState();
TUniqueIdToUniqueIdPB(request_state->query_id(), request.add_query_ids());
});
kudu::rpc::RpcController rpc_controller;
kudu::Status rpc_status =
proxy->AdmissionHeartbeat(request, &response, &rpc_controller);
if (!rpc_status.ok()) {
LOG(ERROR) << "Admission heartbeat rpc failed: " << rpc_status.ToString();
continue;
}
Status heartbeat_status(response.status());
if (!heartbeat_status.ok()) {
LOG(ERROR) << "Admission heartbeat failed: " << heartbeat_status;
}
}
}
Status ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
Coordinator* coord = crs->GetCoordinator();
// Coordinator may be null if query has not started executing, check again later.
if (coord == nullptr) return Status::OK();
Coordinator::ResourceUtilization utilization = coord->ComputeQueryResourceUtilization();
// CPU time consumed by the query so far
int64_t cpu_time_ns = utilization.cpu_sys_ns + utilization.cpu_user_ns;
int64_t cpu_limit_s = crs->query_options().cpu_limit_s;
int64_t cpu_limit_ns = cpu_limit_s * 1000'000'000L;
if (cpu_limit_ns > 0 && cpu_time_ns > cpu_limit_ns) {
Status err = Status::Expected(TErrorCode::CPU_LIMIT_EXCEEDED,
PrintId(crs->query_id()), PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S));
VLOG_QUERY << err.msg().msg();
return err;
}
int64_t scan_bytes = utilization.bytes_read;
int64_t scan_bytes_limit = crs->query_options().scan_bytes_limit;
if (scan_bytes_limit > 0 && scan_bytes > scan_bytes_limit) {
Status err = Status::Expected(TErrorCode::SCAN_BYTES_LIMIT_EXCEEDED,
PrintId(crs->query_id()), PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES));
VLOG_QUERY << err.msg().msg();
return err;
}
auto& max_join_node_entry = utilization.MaxJoinNodeRowsProduced();
int32_t join_node_id = max_join_node_entry.first;
int64_t join_rows_produced = max_join_node_entry.second;
int64_t join_rows_produced_limit = crs->query_options().join_rows_produced_limit;
if (join_rows_produced_limit > 0 && join_rows_produced > join_rows_produced_limit) {
Status err = Status::Expected(TErrorCode::JOIN_ROWS_PRODUCED_LIMIT_EXCEEDED,
PrintId(crs->query_id()),
PrettyPrinter::Print(join_rows_produced_limit, TUnit::UNIT), join_node_id);
VLOG_QUERY << err.msg().msg();
return err;
}
// Query is within the resource limits, check again later.
return Status::OK();
}
void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status,
bool unregister) {
DCHECK(!status.ok());
cancellation_thread_pool_->Offer(
CancellationWork::TerminatedByServer(crs->query_id(), status, unregister));
if (crs->is_expired()) {
// Should only be re-entrant if we're now unregistering the query.
DCHECK(unregister);
} else {
ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
crs->set_expired();
}
}
Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
int32_t hs2_http_port, int32_t external_fe_port) {
exec_env_->SetImpalaServer(this);
#ifdef CALLONCEHACK
// Include this calloncehack call (which is a no-op) to make sure calloncehack
// is required at link time when using it.
calloncehack::InitializeCallOnceHack();
#endif
// We must register the HTTP handlers after registering the ImpalaServer with the
// ExecEnv. Otherwise the HTTP handlers will try to resolve the ImpalaServer through the
// ExecEnv singleton and will receive a nullptr.
http_handler_.reset(ImpalaHttpHandler::CreateImpaladHandler(
this, exec_env_->admission_controller(), exec_env_->cluster_membership_mgr()));
http_handler_->RegisterHandlers(exec_env_->webserver());
if (exec_env_->metrics_webserver() != nullptr) {
http_handler_->RegisterHandlers(
exec_env_->metrics_webserver(), /* metrics_only */ true);
}
if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
return Status("Impala does not have a valid role configured. "
"Either --is_coordinator or --is_executor must be set to true.");
}
// Subscribe with the statestore. Coordinators need to subscribe to the catalog topic
// then wait for the initial catalog update.
RETURN_IF_ERROR(exec_env_->StartStatestoreSubscriberService());
if (FLAGS_is_coordinator) {
exec_env_->frontend()->WaitForCatalog();
ABORT_IF_ERROR(UpdateCatalogMetrics());
}
SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
if (IsExternalTlsConfigured() || IsInternalTlsConfigured()) {
RETURN_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
}
if (!FLAGS_is_coordinator) {
LOG(INFO) << "Initialized executor Impala server on "
<< TNetworkAddressToString(exec_env_->configured_backend_address());
} else {
// Load JWKS from file if validation for signature of JWT token is enabled.
if (FLAGS_jwt_token_auth && FLAGS_jwt_validate_signature) {
if (!FLAGS_jwks_file_path.empty()) {
RETURN_IF_ERROR(JWTHelper::GetInstance()->Init(FLAGS_jwks_file_path));
} else if (!FLAGS_jwks_url.empty()) {
if (TestInfo::is_test()) sleep(1);
RETURN_IF_ERROR(JWTHelper::GetInstance()->Init(FLAGS_jwks_url,
FLAGS_jwks_verify_server_certificate, FLAGS_jwks_ca_certificate, false));
} else {
LOG(ERROR) << "JWKS file is not specified when the validation of JWT signature "
<< " is enabled.";
return Status("JWKS file is not specified");
}
}
// Initialize the client servers.
shared_ptr<ImpalaServer> handler = shared_from_this();
if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0)) {
shared_ptr<TProcessor> beeswax_processor(
new ImpalaServiceProcessor(handler));
shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("beeswax", exec_env_->metrics()));
beeswax_processor->setEventHandler(event_handler);
ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
if (IsExternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for Beeswax";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
ThriftServer* server;
RETURN_IF_ERROR(
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.Build(&server));
beeswax_server_.reset(server);
beeswax_server_->SetConnectionHandler(this);
}
if (hs2_port > 0 || (TestInfo::is_test() && hs2_port == 0)) {
shared_ptr<TProcessor> hs2_fe_processor(
new ImpalaHiveServer2ServiceProcessor(handler));
shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("hs2", exec_env_->metrics()));
hs2_fe_processor->setEventHandler(event_handler);
ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
if (IsExternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for HiveServer2";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
ThriftServer* server;
RETURN_IF_ERROR(
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.Build(&server));
hs2_server_.reset(server);
hs2_server_->SetConnectionHandler(this);
}
if (external_fe_port > 0 || (TestInfo::is_test() && external_fe_port == 0)) {
shared_ptr<TProcessor> external_fe_processor(
new ImpalaHiveServer2ServiceProcessor(handler));
shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("external_frontend", exec_env_->metrics()));
external_fe_processor->setEventHandler(event_handler);
ThriftServer::TransportType external_fe_port_transport =
ThriftServer::TransportType::BINARY;
if (FLAGS_enable_external_fe_http) {
LOG(INFO) << "External FE endpoint is using HTTP for transport";
external_fe_port_transport = ThriftServer::TransportType::HTTP;
}
ThriftServerBuilder builder(EXTERNAL_FRONTEND_SERVER_NAME, external_fe_processor,
external_fe_port);
ThriftServer* server;
RETURN_IF_ERROR(
builder
.auth_provider(
AuthManager::GetInstance()->GetExternalFrontendAuthProvider())
.transport_type(external_fe_port_transport)
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.Build(&server));
external_fe_server_.reset(server);
external_fe_server_->SetConnectionHandler(this);
}
if (hs2_http_port > 0 || (TestInfo::is_test() && hs2_http_port == 0)) {
shared_ptr<TProcessor> hs2_http_processor(
new ImpalaHiveServer2ServiceProcessor(handler));
shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("hs2_http", exec_env_->metrics()));
hs2_http_processor->setEventHandler(event_handler);
ThriftServer* http_server;
ThriftServerBuilder http_builder(
HS2_HTTP_SERVER_NAME, hs2_http_processor, hs2_http_port);
if (IsExternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for HiveServer2 HTTP endpoint.";
http_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
RETURN_IF_ERROR(
http_builder
.auth_provider(AuthManager::GetInstance()->GetExternalHttpAuthProvider())
.transport_type(ThriftServer::TransportType::HTTP)
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.Build(&http_server));
hs2_http_server_.reset(http_server);
hs2_http_server_->SetConnectionHandler(this);
}
internal_server_ = shared_from_this();
RETURN_IF_ERROR(InitWorkloadManagement());
}
LOG(INFO) << "Initialized coordinator/executor Impala server on "
<< TNetworkAddressToString(exec_env_->configured_backend_address());
// Start the RPC services.
RETURN_IF_ERROR(exec_env_->StartKrpcService());
if (hs2_server_.get()) {
RETURN_IF_ERROR(hs2_server_->Start());
LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_server_->port();
}
if (hs2_http_server_.get()) {
RETURN_IF_ERROR(hs2_http_server_->Start());
LOG(INFO) << "Impala HiveServer2 Service (HTTP) listening on "
<< hs2_http_server_->port();
}
if (external_fe_server_.get()) {
RETURN_IF_ERROR(external_fe_server_->Start());
LOG(INFO) << "Impala External Frontend Service listening on "
<< external_fe_server_->port();
}
if (beeswax_server_.get()) {
RETURN_IF_ERROR(beeswax_server_->Start());
LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
}
RETURN_IF_ERROR(DebugAction(FLAGS_debug_actions, "IMPALA_SERVER_END_OF_START"));
services_started_ = true;
ImpaladMetrics::IMPALA_SERVER_READY->SetValue(true);
LOG(INFO) << "Impala has started.";
return Status::OK();
}
void ImpalaServer::Join() {
// The server shuts down by exiting the process, so just block here until the process
// exits.
exec_env_->rpc_mgr()->Join();
if (FLAGS_is_coordinator) {
beeswax_server_->Join();
hs2_server_->Join();
beeswax_server_.reset();
hs2_server_.reset();
}
}
bool ImpalaServer::AreServicesReady() const {
return services_started_.load() && exec_env_->IsStatestoreRegistrationCompleted();
}
Status ImpalaServer::CheckClientRequestSession(
SessionState* session, const std::string& client_request_effective_user,
const TUniqueId& query_id) {
const string& session_user = GetEffectiveUser(*session);
// Empty session users only occur for unauthenticated sessions where no user was
// specified by the client, e.g. unauthenticated beeswax sessions. Skip the
// check in this case because no security is enabled. Some tests rely on
// this behaviour, e.g. to poll query status from a new beeswax connection.
if (!session_user.empty() && session_user != client_request_effective_user) {
Status err = Status::Expected(
Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(query_id)));
VLOG(1) << err << " caused by user mismatch: '" << session_user << "' vs '"
<< client_request_effective_user << "'";
return err;
}
return Status::OK();
}
void ImpalaServer::UpdateFilter(UpdateFilterResultPB* result,
const UpdateFilterParamsPB& params, RpcContext* context) {
DCHECK(params.has_query_id());
DCHECK(params.has_filter_id());
QueryHandle query_handle;
Status status = GetQueryHandle(ProtoToQueryId(params.query_id()), &query_handle);
if (!status.ok()) {
LOG(INFO) << "Could not find query handle for query id: "
<< PrintId(ProtoToQueryId(params.query_id()));
return;
}
ClientRequestState::RetryState retry_state = query_handle->retry_state();
if (retry_state != ClientRequestState::RetryState::RETRYING
&& retry_state != ClientRequestState::RetryState::RETRIED) {
query_handle->UpdateFilter(params, context);
}
}
Status ImpalaServer::CheckNotShuttingDown() const {
if (!IsShuttingDown()) return Status::OK();
return Status::Expected(ErrorMsg(
TErrorCode::SERVER_SHUTTING_DOWN, ShutdownStatusToString(GetShutdownStatus())));
}
ShutdownStatusPB ImpalaServer::GetShutdownStatus() const {
ShutdownStatusPB result;
int64_t shutdown_time = shutting_down_.Load();
DCHECK_GT(shutdown_time, 0);
int64_t shutdown_deadline = shutdown_deadline_.Load();
DCHECK_GT(shutdown_time, 0);
int64_t now = MonotonicMillis();
int64_t elapsed_ms = now - shutdown_time;
result.set_grace_remaining_ms(
max<int64_t>(0, FLAGS_shutdown_grace_period_s * 1000 - elapsed_ms));
result.set_deadline_remaining_ms(max<int64_t>(0, shutdown_deadline - now));
result.set_finstances_executing(
ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue());
result.set_client_requests_registered(
ImpaladMetrics::NUM_QUERIES_REGISTERED->GetValue());
result.set_backend_queries_executing(
ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->GetValue());
return result;
}
string ImpalaServer::ShutdownStatusToString(const ShutdownStatusPB& shutdown_status) {
return Substitute("shutdown grace period left: $0, deadline left: $1, "
"queries registered on coordinator: $2, queries executing: $3, "
"fragment instances: $4",
PrettyPrinter::Print(shutdown_status.grace_remaining_ms(), TUnit::TIME_MS),
PrettyPrinter::Print(shutdown_status.deadline_remaining_ms(), TUnit::TIME_MS),
shutdown_status.client_requests_registered(),
shutdown_status.backend_queries_executing(),
shutdown_status.finstances_executing());
}
Status ImpalaServer::StartShutdown(
int64_t relative_deadline_s, ShutdownStatusPB* shutdown_status) {
DCHECK_GE(relative_deadline_s, -1);
if (relative_deadline_s == -1) relative_deadline_s = FLAGS_shutdown_deadline_s;
int64_t now = MonotonicMillis();
int64_t new_deadline = now + relative_deadline_s * 1000L;
bool set_deadline = false;
bool set_grace = false;
int64_t curr_deadline = shutdown_deadline_.Load();
while (curr_deadline == 0 || curr_deadline > new_deadline) {
// Set the deadline - it was either unset or later than the new one.
if (shutdown_deadline_.CompareAndSwap(curr_deadline, new_deadline)) {
set_deadline = true;
break;
}
curr_deadline = shutdown_deadline_.Load();
}
// Dump the data cache before shutdown.
discard_result(ExecEnv::GetInstance()->disk_io_mgr()->DumpDataCache());
while (shutting_down_.Load() == 0) {
if (!shutting_down_.CompareAndSwap(0, now)) continue;
unique_ptr<Thread> t;
Status status =
Thread::Create("shutdown", "shutdown", [this] { ShutdownThread(); }, &t, false);
if (!status.ok()) {
LOG(ERROR) << "Failed to create shutdown thread: " << status.GetDetail();
return status;
}
set_grace = true;
break;
}
*shutdown_status = GetShutdownStatus();
// Show the full grace/limit times to avoid showing confusing intermediate values
// to the person running the statement.
if (set_grace) {
shutdown_status->set_grace_remaining_ms(FLAGS_shutdown_grace_period_s * 1000L);
}
if (set_deadline) {
shutdown_status->set_deadline_remaining_ms(relative_deadline_s * 1000L);
}
return Status::OK();
}
[[noreturn]] void ImpalaServer::ShutdownThread() {
while (true) {
SleepForMs(1000);
const ShutdownStatusPB& shutdown_status = GetShutdownStatus();
LOG(INFO) << "Shutdown status: " << ShutdownStatusToString(shutdown_status);
if (shutdown_status.grace_remaining_ms() <= 0
&& shutdown_status.backend_queries_executing() == 0
&& shutdown_status.client_requests_registered() == 0) {
break;
} else if (shutdown_status.deadline_remaining_ms() <= 0) {
break;
}
}
// Drain the completed queries queue to the query log table.
ShutdownWorkloadManagement();
LOG(INFO) << "Shutdown complete, going down.";
// Use _exit here instead since exit() does cleanup which interferes with the shutdown
// signal handler thread causing a data race.
ShutdownLogging();
_exit(0);
}
// This should never be inlined to prevent it potentially being optimized, e.g.
// by short-circuiting the comparisons.
__attribute__((noinline)) int ImpalaServer::SecretArg::ConstantTimeCompare(
const TUniqueId& other) const {
// Compiles to two integer comparisons and an addition with no branches.
// TODO: consider replacing with CRYPTO_memcmp() once our minimum supported OpenSSL
// version has it.
return (secret_.hi != other.hi) + (secret_.lo != other.lo);
}
void ImpalaServer::GetAllConnectionContexts(
ThriftServer::ConnectionContextList* connection_contexts) {
DCHECK_EQ(connection_contexts->size(), 0);
// Get the connection contexts of the beeswax server
if (beeswax_server_.get()) {
beeswax_server_->GetConnectionContextList(connection_contexts);
}
// Get the connection contexts of the hs2 server
if (hs2_server_.get()) {
hs2_server_->GetConnectionContextList(connection_contexts);
}
// Get the connection contexts of the hs2-http server
if (hs2_http_server_.get()) {
hs2_http_server_->GetConnectionContextList(connection_contexts);
}
// Get the connection contexts of the external fe server
if (external_fe_server_.get()) {
external_fe_server_->GetConnectionContextList(connection_contexts);
}
// Get the connection contexts of the internal server
if (internal_server_.get()) {
internal_server_->GetConnectionContextList(connection_contexts);
}
}
TUniqueId ImpalaServer::RandomUniqueID() {
uuid conn_uuid;
{
lock_guard<mutex> l(uuid_lock_);
conn_uuid = crypto_uuid_generator_();
}
TUniqueId conn_id;
UUIDToTUniqueId(conn_uuid, &conn_id);
return conn_id;
}
} // namespace impala