| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "service/client-request-state.h" |
| |
| #include <optional> |
| |
| #include <boost/algorithm/string/join.hpp> |
| #include <boost/algorithm/string/predicate.hpp> |
| #include <boost/algorithm/string/replace.hpp> |
| #include <limits> |
| #include <gutil/strings/substitute.h> |
| #include <rapidjson/rapidjson.h> |
| #include <rapidjson/stringbuffer.h> |
| #include <rapidjson/writer.h> |
| |
| #include "catalog/catalog-service-client-wrapper.h" |
| #include "common/status.h" |
| #include "exprs/timezone_db.h" |
| #include "gen-cpp/Types_types.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "observe/otel.h" |
| #include "observe/span-manager.h" |
| #include "rpc/rpc-mgr.inline.h" |
| #include "runtime/coordinator.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/mem-tracker.h" |
| #include "runtime/query-driver.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/timestamp-value.h" |
| #include "runtime/timestamp-value.inline.h" |
| #include "scheduling/admission-control-client.h" |
| #include "scheduling/cluster-membership-mgr.h" |
| #include "scheduling/scheduler.h" |
| #include "service/frontend.h" |
| #include "service/impala-server.h" |
| #include "service/query-options.h" |
| #include "service/query-result-set.h" |
| #include "util/auth-util.h" |
| #include "util/debug-util.h" |
| #include "util/impalad-metrics.h" |
| #include "util/lineage-util.h" |
| #include "util/pretty-printer.h" |
| #include "util/redactor.h" |
| #include "util/runtime-profile.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/time.h" |
| #include "util/uid-util.h" |
| |
| #include "gen-cpp/CatalogService_types.h" |
| #include "gen-cpp/control_service.pb.h" |
| #include "gen-cpp/control_service.proxy.h" |
| |
| #include <thrift/Thrift.h> |
| |
| #include "common/names.h" |
| #include "control-service.h" |
| |
| using boost::algorithm::iequals; |
| using boost::algorithm::join; |
| using boost::algorithm::replace_all_copy; |
| using kudu::rpc::RpcController; |
| using namespace apache::hive::service::cli::thrift; |
| using namespace apache::thrift; |
| using namespace beeswax; |
| using namespace strings; |
| |
| DECLARE_bool(abort_on_failed_audit_event); |
| DECLARE_bool(abort_on_failed_lineage_event); |
| DECLARE_int32(krpc_port); |
| DECLARE_int64(max_result_cache_size); |
| DECLARE_bool(otel_trace_enabled); |
| DECLARE_bool(use_local_catalog); |
| |
| namespace impala { |
| |
| PROFILE_DEFINE_TIMER(ClientFetchLockWaitTimer, UNSTABLE, |
| "Cumulative time client fetch requests waiting for locks."); |
| PROFILE_DEFINE_SUMMARY_STATS_TIMER(GetInFlightProfileTimeStats, UNSTABLE, |
| "Summary stats of the time dumping profiles when the query is still in-flight."); |
| |
| // Keys into the info string map of the runtime profile referring to specific |
| // items used by CM for monitoring purposes. |
| static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem"; |
| static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats"; |
| static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats"; |
| static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids"; |
| |
| static const string QUERY_STATUS_KEY = "Query Status"; |
| static const string RETRY_STATUS_KEY = "Retry Status"; |
| |
| const TExecRequest ClientRequestState::unknown_exec_request_; |
| |
| ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* frontend, |
| ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session, |
| QueryDriver* query_driver) |
| : query_ctx_(query_ctx), |
| last_active_time_ms_(numeric_limits<int64_t>::max()), |
| child_query_executor_(new ChildQueryExecutor), |
| session_(move(session)), |
| coord_exec_called_(false), |
| // Profile is assigned name w/ id after planning |
| profile_(RuntimeProfile::Create(&profile_pool_, "Query", false)), |
| frontend_profile_(RuntimeProfile::Create(&profile_pool_, "Frontend", false)), |
| server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer", false)), |
| summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary", false)), |
| frontend_(frontend), |
| parent_server_(server), |
| start_time_us_(UnixMicros()), |
| fetch_rows_timeout_us_(MICROS_PER_MILLI * query_options().fetch_rows_timeout_ms), |
| parent_driver_(query_driver) { |
| |
| if (FLAGS_otel_trace_enabled && should_otel_trace_query(sql_stmt(), |
| query_ctx.session.session_type)) { |
| // initialize OpenTelemetry for this query |
| VLOG(2) << "Initializing OpenTelemetry for query " << PrintId(query_id()); |
| otel_span_manager_ = build_span_manager(this); |
| otel_span_manager_->StartChildSpanInit(); |
| } |
| |
| bool is_external_fe = session_type() == TSessionType::EXTERNAL_FRONTEND; |
| // "Impala Backend Timeline" was specifically chosen to exploit the lexicographical |
| // ordering defined by the underlying std::map holding the timelines displayed in |
| // the web UI. This helps ensure that "Frontend Timeline" is displayed before |
| // "Impala Backend Timeline". |
| query_events_ = summary_profile_->AddEventSequence( |
| is_external_fe ? "Impala Backend Timeline" : "Query Timeline"); |
| query_events_->Start(); |
| profile_->AddChild(summary_profile_); |
| |
| #ifndef NDEBUG |
| profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a " |
| "DEBUG build of Impala. Use RELEASE builds to measure query performance."); |
| #endif |
| row_materialization_timer_ = ADD_TIMER(server_profile_, "RowMaterializationTimer"); |
| num_rows_fetched_counter_ = ADD_COUNTER(server_profile_, "NumRowsFetched", TUnit::UNIT); |
| row_materialization_rate_ = |
| server_profile_->AddDerivedCounter("RowMaterializationRate", TUnit::UNIT_PER_SECOND, |
| bind<int64_t>(&RuntimeProfile::UnitsPerSecond, num_rows_fetched_counter_, |
| row_materialization_timer_)); |
| num_rows_fetched_from_cache_counter_ = |
| ADD_COUNTER(server_profile_, "NumRowsFetchedFromCache", TUnit::UNIT); |
| client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer"); |
| client_wait_time_stats_ = |
| ADD_SUMMARY_STATS_TIMER(server_profile_, "ClientFetchWaitTimeStats"); |
| rpc_read_timer_ = ADD_TIMER(server_profile_, "RPCReadTimer"); |
| rpc_write_timer_ = ADD_TIMER(server_profile_, "RPCWriteTimer"); |
| rpc_count_ = ADD_COUNTER(server_profile_, "RPCCount", TUnit::UNIT); |
| get_inflight_profile_time_stats_ = |
| PROFILE_GetInFlightProfileTimeStats.Instantiate(server_profile_); |
| client_fetch_lock_wait_timer_ = |
| PROFILE_ClientFetchLockWaitTimer.Instantiate(server_profile_); |
| |
| profile_->set_name("Query (id=" + PrintId(query_id()) + ")"); |
| summary_profile_->AddInfoString("Session ID", PrintId(session_id())); |
| summary_profile_->AddInfoString("Session Type", PrintValue(session_type())); |
| if (session_type() == TSessionType::HIVESERVER2 || |
| session_type() == TSessionType::EXTERNAL_FRONTEND) { |
| summary_profile_->AddInfoString( |
| "HiveServer2 Protocol Version", Substitute("V$0", 1 + session_->hs2_version)); |
| } |
| // Certain API clients expect Start Time and End Time to be date-time strings |
| // of nanosecond precision, so we explicitly specify the precision here. |
| summary_profile_->AddInfoString("Start Time", ToStringFromUnixMicros(start_time_us(), |
| TimePrecision::Nanosecond)); |
| summary_profile_->AddInfoString("End Time", ""); |
| summary_profile_->AddInfoString("Duration", ""); |
| summary_profile_->AddInfoString("Query Type", "N/A"); |
| summary_profile_->AddInfoString("Query State", PrintValue(BeeswaxQueryState())); |
| summary_profile_->AddInfoString( |
| "Impala Query State", ExecStateToString(exec_state())); |
| summary_profile_->AddInfoString("Query Status", "OK"); |
| summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true)); |
| summary_profile_->AddInfoString("User", effective_user()); |
| summary_profile_->AddInfoString("Connected User", connected_user()); |
| summary_profile_->AddInfoString("Delegated User", do_as_user()); |
| summary_profile_->AddInfoString("Network Address", |
| TNetworkAddressToString(session_->network_address)); |
| if (!session_->http_origin.empty()) { |
| /// If using hs2-http protocol, this is the origin of the session |
| /// as recorded in the X-Forwarded-For http message header. |
| summary_profile_->AddInfoString("Http Origin", session_->http_origin); |
| } |
| summary_profile_->AddInfoString("Default Db", default_db()); |
| summary_profile_->AddInfoStringRedacted( |
| "Sql Statement", query_ctx_.client_request.stmt); |
| summary_profile_->AddInfoString("Coordinator", |
| TNetworkAddressToString(ExecEnv::GetInstance()->configured_backend_address())); |
| |
| summary_profile_->AddChild(frontend_profile_); |
| |
| AdmissionControlClient::Create(query_ctx_, &admission_control_client_); |
| } |
| |
| ClientRequestState::~ClientRequestState() { |
| DCHECK(wait_thread_.get() == NULL) << "Finalize() needs to be called!"; |
| DCHECK(!track_rpcs_); // Should get set to false in Finalize() |
| DCHECK(pending_rpcs_.empty()); // Should get cleared in Finalize() |
| UnRegisterRemainingRPCs(); // Avoid memory leaks if Finalize() didn't get called |
| } |
| |
| Status ClientRequestState::SetResultCache(QueryResultSet* cache, |
| int64_t max_size) { |
| lock_guard<mutex> l(lock_); |
| DCHECK(result_cache_ == NULL); |
| result_cache_.reset(cache); |
| if (max_size > FLAGS_max_result_cache_size) { |
| return Status( |
| Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.", |
| max_size, FLAGS_max_result_cache_size)); |
| } |
| result_cache_max_size_ = max_size; |
| return Status::OK(); |
| } |
| |
| void ClientRequestState::SetRemoteSubmitTime(int64_t remote_submit_time) { |
| int64_t ack_submit_time = min(MonotonicStopWatch::Now(), remote_submit_time); |
| if (ack_submit_time < remote_submit_time) { |
| VLOG_QUERY << "Ignoring remote_submit_time (" << remote_submit_time |
| << " ns) that is more than coordinator time (" << ack_submit_time |
| << " ns) for query id=" << PrintId(query_id()); |
| } |
| query_events_->Start(ack_submit_time); |
| } |
| |
| void ClientRequestState::SetFrontendProfile(const TExecRequest& exec_request) { |
| // Should we defer creating and adding the child until here? probably. |
| TRuntimeProfileTree prof_tree; |
| prof_tree.nodes.emplace_back(std::move(exec_request.profile)); |
| for (auto& child : exec_request.profile_children) { |
| prof_tree.nodes.emplace_back(std::move(child)); |
| } |
| prof_tree.nodes.at(0).num_children = prof_tree.nodes.size() - 1; |
| frontend_profile_->Update(prof_tree, false); |
| } |
| |
| void ClientRequestState::AddBlacklistedExecutorAddress(const NetworkAddressPB& addr) { |
| lock_guard<mutex> l(lock_); |
| if (!WasRetried()) blacklisted_executor_addresses_.emplace(addr); |
| } |
| |
| void ClientRequestState::SetBlacklistedExecutorAddresses( |
| std::unordered_set<NetworkAddressPB>& executor_addresses) { |
| DCHECK(blacklisted_executor_addresses_.empty()); |
| if (!executor_addresses.empty()) { |
| blacklisted_executor_addresses_.insert( |
| executor_addresses.begin(), executor_addresses.end()); |
| } |
| } |
| |
| Status ClientRequestState::Exec() { |
| const TExecRequest& exec_req = exec_request(); |
| profile_->AddChild(server_profile_); |
| summary_profile_->AddInfoString("Query Type", PrintValue(stmt_type())); |
| summary_profile_->AddInfoString("Query Options (set by configuration)", |
| DebugQueryOptions(query_ctx_.client_request.query_options)); |
| summary_profile_->AddInfoString("Query Options (set by configuration and planner)", |
| DebugQueryOptions(exec_req.query_options)); |
| if (!exec_req.tables.empty()) { |
| summary_profile_->AddInfoString("Tables Queried", PrintTableList(exec_req.tables)); |
| } |
| if (!exec_req.select_columns.empty()) { |
| summary_profile_->AddInfoString("Select Columns", join(exec_req.select_columns, ",")); |
| } |
| if (!exec_req.where_columns.empty()) { |
| summary_profile_->AddInfoString("Where Columns", join(exec_req.where_columns, ",")); |
| } |
| if (!exec_req.join_columns.empty()) { |
| summary_profile_->AddInfoString("Join Columns", join(exec_req.join_columns, ",")); |
| } |
| if (!exec_req.aggregate_columns.empty()) { |
| summary_profile_->AddInfoString( |
| "Aggregate Columns", join(exec_req.aggregate_columns, ",")); |
| } |
| if (!exec_req.orderby_columns.empty()) { |
| summary_profile_->AddInfoString( |
| "OrderBy Columns", join(exec_req.orderby_columns, ",")); |
| } |
| if (query_ctx_.__isset.overridden_mt_dop_value) { |
| DCHECK(query_ctx_.client_request.query_options.__isset.mt_dop); |
| summary_profile_->AddInfoString("MT_DOP limited by admission control", |
| Substitute("Requested MT_DOP=$0 reduced to MT_DOP=$1", |
| query_ctx_.overridden_mt_dop_value, |
| query_ctx_.client_request.query_options.mt_dop)); |
| } |
| |
| // Don't start executing the query if Cancel() was called between planning and Exec(). |
| RETURN_IF_CANCELLED(this); |
| MarkActive(); |
| |
| switch (exec_req.stmt_type) { |
| case TStmtType::QUERY: |
| case TStmtType::DML: |
| DCHECK(exec_req.__isset.query_exec_request); |
| RETURN_IF_ERROR( |
| ExecQueryOrDmlRequest(exec_req.query_exec_request, true /*async*/)); |
| break; |
| case TStmtType::EXPLAIN: { |
| request_result_set_.reset(new vector<TResultRow>( |
| exec_req.explain_result.results)); |
| break; |
| } |
| case TStmtType::TESTCASE: { |
| DCHECK(exec_req.__isset.testcase_data_path); |
| SetResultSet(vector<string>(1, exec_req.testcase_data_path)); |
| break; |
| } |
| case TStmtType::DDL: { |
| DCHECK(exec_req.__isset.catalog_op_request); |
| if (otel_trace_query()) { |
| otel_span_manager_->StartChildSpanQueryExecution(); |
| } |
| LOG_AND_RETURN_IF_ERROR(ExecDdlRequest()); |
| break; |
| } |
| case TStmtType::LOAD: { |
| DCHECK(exec_req.__isset.load_data_request); |
| LOG_AND_RETURN_IF_ERROR(ExecLoadDataRequest()); |
| break; |
| } |
| case TStmtType::SET: { |
| DCHECK(exec_req.__isset.set_query_option_request); |
| lock_guard<mutex> l(session_->lock); |
| if (exec_req.set_query_option_request.__isset.key) { |
| // "SET key=value" updates the session query options. |
| DCHECK(exec_req.set_query_option_request.__isset.value); |
| const auto& key = exec_req.set_query_option_request.key; |
| const auto& value = exec_req.set_query_option_request.value; |
| RETURN_IF_ERROR(SetQueryOption(key, value, &session_->set_query_options, |
| &session_->set_query_options_mask)); |
| SetResultSet({}, {}, {}); |
| if (iequals(key, "idle_session_timeout")) { |
| // IMPALA-2248: Session timeout is set as a query option |
| session_->last_accessed_ms = UnixMillis(); // do not expire session immediately |
| session_->UpdateTimeout(); |
| VLOG_QUERY << "ClientRequestState::Exec() SET: idle_session_timeout=" |
| << PrettyPrinter::Print(session_->session_timeout, TUnit::TIME_S); |
| } |
| } else if (exec_req.set_query_option_request.__isset.query_option_type |
| && exec_req.set_query_option_request.query_option_type |
| == TQueryOptionType::UNSET_ALL) { |
| // "UNSET ALL" |
| RETURN_IF_ERROR(ResetAllQueryOptions( |
| &session_->set_query_options, &session_->set_query_options_mask)); |
| SetResultSet({}, {}, {}); |
| } else { |
| // "SET" or "SET ALL" |
| bool is_set_all = |
| exec_req.set_query_option_request.__isset.query_option_type |
| && exec_req.set_query_option_request.query_option_type |
| == TQueryOptionType::SET_ALL; |
| PopulateResultForSet(is_set_all); |
| } |
| break; |
| } |
| case TStmtType::ADMIN_FN: |
| if (exec_req.admin_request.type == TAdminRequestType::SHUTDOWN) { |
| RETURN_IF_ERROR(ExecShutdownRequest()); |
| } else if (exec_req.admin_request.type == TAdminRequestType::EVENT_PROCESSOR) { |
| RETURN_IF_ERROR(ExecEventProcessorCmd()); |
| } else { |
| DCHECK(false); |
| } |
| break; |
| case TStmtType::CONVERT: |
| DCHECK(exec_req.__isset.convert_table_request); |
| LOG_AND_RETURN_IF_ERROR(ExecMigrateRequest()); |
| break; |
| case TStmtType::UNKNOWN: |
| DCHECK(false); |
| return Status("Exec request uninitialized during execution"); |
| case TStmtType::KILL: |
| DCHECK(exec_req.__isset.kill_query_request); |
| LOG_AND_RETURN_IF_ERROR(ExecKillQueryRequest()); |
| break; |
| default: |
| return Status(Substitute("Unknown exec request stmt type: $0", exec_req.stmt_type)); |
| } |
| |
| if (async_exec_thread_.get() == nullptr) { |
| UpdateNonErrorExecState(ExecState::RUNNING); |
| } |
| return Status::OK(); |
| } |
| |
| void ClientRequestState::PopulateResultForSet(bool is_set_all) { |
| map<string, string> config; |
| TQueryOptionsToMap(query_options(), &config); |
| vector<string> keys, values, levels; |
| map<string, string>::const_iterator itr = config.begin(); |
| for (; itr != config.end(); ++itr) { |
| const auto opt_level_id = |
| parent_server_->query_option_levels_[itr->first]; |
| if (!is_set_all && (opt_level_id == TQueryOptionLevel::DEVELOPMENT || |
| opt_level_id == TQueryOptionLevel::DEPRECATED || |
| opt_level_id == TQueryOptionLevel::REMOVED)) { |
| continue; |
| } |
| keys.push_back(itr->first); |
| values.push_back(itr->second); |
| const auto opt_level = _TQueryOptionLevel_VALUES_TO_NAMES.find(opt_level_id); |
| DCHECK(opt_level !=_TQueryOptionLevel_VALUES_TO_NAMES.end()); |
| levels.push_back(opt_level->second); |
| } |
| SetResultSet(keys, values, levels); |
| } |
| |
| Status ClientRequestState::ExecLocalCatalogOp( |
| const TCatalogOpRequest& catalog_op) { |
| switch (catalog_op.op_type) { |
| case TCatalogOpType::USE: { |
| lock_guard<mutex> l(session_->lock); |
| session_->database = exec_request().catalog_op_request.use_db_params.db; |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_TABLES: |
| case TCatalogOpType::SHOW_VIEWS: { |
| const TShowTablesParams* params = &catalog_op.show_tables_params; |
| // A NULL pattern means match all tables of the specified table types. However, |
| // Thrift string types can't be NULL in C++, so we have to test if it's set rather |
| // than just blindly using the value. |
| const string* table_name_pattern = |
| params->__isset.show_pattern ? &(params->show_pattern) : nullptr; |
| TGetTablesResult table_names; |
| const set<TImpalaTableType::type>& table_types = params->table_types; |
| RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name_pattern, |
| &query_ctx_.session, table_types, &table_names)); |
| SetResultSet(table_names.tables); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_METADATA_TABLES: { |
| const TShowTablesParams* params = &catalog_op.show_tables_params; |
| // A NULL pattern means match all tables of the specified table types. However, |
| // Thrift string types can't be NULL in C++, so we have to test if it's set rather |
| // than just blindly using the value. |
| const string* metadata_table_name_pattern = |
| params->__isset.show_pattern ? &(params->show_pattern) : nullptr; |
| DCHECK(params->__isset.tbl); |
| const string& table_name = params->tbl; |
| TGetTablesResult table_names; |
| RETURN_IF_ERROR(frontend_->GetMetadataTableNames(params->db, table_name, |
| metadata_table_name_pattern, &query_ctx_.session, &table_names)); |
| SetResultSet(table_names.tables); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_DBS: { |
| const TShowDbsParams* params = &catalog_op.show_dbs_params; |
| TGetDbsResult dbs; |
| const string* db_pattern = |
| params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; |
| RETURN_IF_ERROR( |
| frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs)); |
| vector<string> names, comments; |
| names.reserve(dbs.dbs.size()); |
| comments.reserve(dbs.dbs.size()); |
| for (const TDatabase& db: dbs.dbs) { |
| names.push_back(db.db_name); |
| comments.push_back(db.metastore_db.description); |
| } |
| SetResultSet(names, comments); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_DATA_SRCS: { |
| const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params; |
| TGetDataSrcsResult result; |
| const string* pattern = |
| params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; |
| RETURN_IF_ERROR( |
| frontend_->GetDataSrcMetadata(pattern, &result)); |
| SetResultSet(result.data_src_names, result.locations, result.class_names, |
| result.api_versions); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_STATS: { |
| const TShowStatsParams& params = catalog_op.show_stats_params; |
| TResultSet response; |
| RETURN_IF_ERROR(frontend_->GetStats(params, &response)); |
| // Set the result set and its schema from the response. |
| request_result_set_.reset(new vector<TResultRow>(response.rows)); |
| result_metadata_ = response.schema; |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_FUNCTIONS: { |
| const TShowFunctionsParams* params = &catalog_op.show_fns_params; |
| TGetFunctionsResult functions; |
| const string* fn_pattern = |
| params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; |
| RETURN_IF_ERROR(frontend_->GetFunctions( |
| params->category, params->db, fn_pattern, &query_ctx_.session, &functions)); |
| SetResultSet(functions.fn_ret_types, functions.fn_signatures, |
| functions.fn_binary_types, functions.fn_persistence); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_ROLES: { |
| const TShowRolesParams& params = catalog_op.show_roles_params; |
| // If we have made it here, the user has privileges to execute this operation. |
| // Return the results. |
| TShowRolesResult result; |
| RETURN_IF_ERROR(frontend_->ShowRoles(params, &result)); |
| SetResultSet(result.role_names); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_GRANT_PRINCIPAL: { |
| const TShowGrantPrincipalParams& params = catalog_op.show_grant_principal_params; |
| TResultSet response; |
| RETURN_IF_ERROR(frontend_->GetPrincipalPrivileges(params, &response)); |
| // Set the result set and its schema from the response. |
| request_result_set_.reset(new vector<TResultRow>(response.rows)); |
| result_metadata_ = response.schema; |
| return Status::OK(); |
| } |
| case TCatalogOpType::DESCRIBE_HISTORY: { |
| // This operation is supported for Iceberg tables only. |
| const TDescribeHistoryParams& params = catalog_op.describe_history_params; |
| TGetTableHistoryResult result; |
| RETURN_IF_ERROR(frontend_->GetTableHistory(params, &result)); |
| |
| request_result_set_.reset(new vector<TResultRow>); |
| request_result_set_->resize(result.result.size()); |
| for (int i = 0; i < result.result.size(); ++i) { |
| const TGetTableHistoryResultItem item = result.result[i]; |
| TResultRow &result_row = (*request_result_set_.get())[i]; |
| result_row.__isset.colVals = true; |
| result_row.colVals.resize(4); |
| const Timezone* local_tz = TimezoneDatabase::FindTimezone( |
| query_options().timezone); |
| TimestampValue tv = TimestampValue::FromUnixTimeMicros( |
| item.creation_time * 1000, local_tz); |
| result_row.colVals[0].__set_string_val(tv.ToString()); |
| result_row.colVals[1].__set_string_val(std::to_string(item.snapshot_id)); |
| result_row.colVals[2].__set_string_val( |
| (item.__isset.parent_id) ? std::to_string(item.parent_id) : "NULL"); |
| result_row.colVals[3].__set_string_val( |
| (item.is_current_ancestor) ? "TRUE" : "FALSE"); |
| } |
| return Status::OK(); |
| } |
| case TCatalogOpType::DESCRIBE_DB: { |
| TDescribeResult response; |
| RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params, |
| &response)); |
| // Set the result set |
| request_result_set_.reset(new vector<TResultRow>(response.results)); |
| return Status::OK(); |
| } |
| case TCatalogOpType::DESCRIBE_TABLE: { |
| TDescribeResult response; |
| const TDescribeTableParams& params = catalog_op.describe_table_params; |
| RETURN_IF_ERROR(frontend_->DescribeTable(params, query_ctx_.session, &response)); |
| // Set the result set |
| request_result_set_.reset(new vector<TResultRow>(response.results)); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_CREATE_TABLE: { |
| string response; |
| RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params, |
| &response)); |
| SetResultSet(vector<string>(1, response)); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_CREATE_FUNCTION: { |
| string response; |
| RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params, |
| &response)); |
| SetResultSet(vector<string>(1, response)); |
| return Status::OK(); |
| } |
| case TCatalogOpType::SHOW_FILES: { |
| TResultSet response; |
| RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response)); |
| // Set the result set and its schema from the response. |
| request_result_set_.reset(new vector<TResultRow>(response.rows)); |
| result_metadata_ = response.schema; |
| return Status::OK(); |
| } |
| default: { |
| stringstream ss; |
| ss << "Unexpected TCatalogOpType: " << catalog_op.op_type; |
| return Status(ss.str()); |
| } |
| } |
| } |
| |
| Status ClientRequestState::ExecQueryOrDmlRequest( |
| const TQueryExecRequest& query_exec_request, bool isAsync) { |
| // we always need at least one plan fragment |
| DCHECK(query_exec_request.plan_exec_info.size() > 0); |
| |
| if (query_exec_request.__isset.query_plan) { |
| stringstream plan_ss; |
| // Add some delimiters to make it clearer where the plan |
| // begins and the profile ends |
| plan_ss << "\n----------------\n" |
| << query_exec_request.query_plan |
| << "----------------"; |
| summary_profile_->AddInfoStringRedacted("Plan", plan_ss.str()); |
| } |
| // Add info strings consumed by CM: Estimated mem and tables missing stats. |
| if (query_exec_request.__isset.per_host_mem_estimate) { |
| stringstream ss; |
| ss << query_exec_request.per_host_mem_estimate; |
| summary_profile_->AddInfoString(PER_HOST_MEM_KEY, ss.str()); |
| } |
| if (!query_exec_request.query_ctx.__isset.parent_query_id && |
| query_exec_request.query_ctx.__isset.tables_missing_stats && |
| !query_exec_request.query_ctx.tables_missing_stats.empty()) { |
| summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, |
| PrintTableList(query_exec_request.query_ctx.tables_missing_stats)); |
| } |
| |
| if (!query_exec_request.query_ctx.__isset.parent_query_id && |
| query_exec_request.query_ctx.__isset.tables_with_corrupt_stats && |
| !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) { |
| summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, |
| PrintTableList(query_exec_request.query_ctx.tables_with_corrupt_stats)); |
| } |
| |
| if (query_exec_request.query_ctx.__isset.tables_missing_diskids && |
| !query_exec_request.query_ctx.tables_missing_diskids.empty()) { |
| summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, |
| PrintTableList(query_exec_request.query_ctx.tables_missing_diskids)); |
| } |
| |
| // Don't start executing the query if Cancel() was called concurrently with Exec(). |
| RETURN_IF_CANCELLED(this); |
| if (isAsync) { |
| // Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because |
| // the query should be in the PENDING state before the Exec RPC returns. |
| UpdateNonErrorExecState(ExecState::PENDING); |
| RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread", |
| &ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_, |
| true)); |
| } else { |
| // Update query_status_ as necessary. |
| FinishExecQueryOrDmlRequest(); |
| return query_status_; |
| } |
| return Status::OK(); |
| } |
| |
| void ClientRequestState::FinishExecQueryOrDmlRequest() { |
| const TExecRequest& exec_req = exec_request(); |
| DCHECK(exec_req.__isset.query_exec_request); |
| UniqueIdPB query_id_pb; |
| TUniqueIdToUniqueIdPB(query_id(), &query_id_pb); |
| |
| if (otel_trace_query() && !IsCTAS()) { |
| otel_span_manager_->StartChildSpanAdmissionControl(); |
| } |
| |
| Status admit_status = admission_control_client_->SubmitForAdmission( |
| {query_id_pb, ExecEnv::GetInstance()->backend_id(), |
| exec_req.query_exec_request, exec_req.query_options, |
| summary_profile_, blacklisted_executor_addresses_}, |
| query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_, |
| otel_span_manager_.get()); |
| |
| if (otel_trace_query() && !IsCTAS()) { |
| otel_span_manager_->EndChildSpanAdmissionControl(admit_status); |
| otel_span_manager_->StartChildSpanQueryExecution(); |
| } |
| |
| { |
| lock_guard<mutex> l(lock_); |
| if (!UpdateQueryStatus(admit_status).ok()) return; |
| } |
| |
| DCHECK(schedule_.get() != nullptr); |
| // Note that we don't need to check for cancellation between admission and query |
| // startup. The query was not cancelled right before being admitted and the window here |
| // is small enough to not require special handling. Instead we start the query and then |
| // cancel it through the check below if necessary. |
| DebugActionNoFail(exec_req.query_options, "CRS_BEFORE_COORD_STARTS"); |
| // Register the query with the server to support cancellation. This happens after |
| // admission because now the set of executors is fixed and an executor failure will |
| // cause a query failure. |
| parent_server_->RegisterQueryLocations(schedule_->backend_exec_params(), query_id()); |
| coord_.reset(new Coordinator(this, exec_req, *schedule_.get(), query_events_)); |
| Status exec_status = coord_->Exec(); |
| |
| DebugActionNoFail(exec_req.query_options, "CRS_AFTER_COORD_STARTS"); |
| |
| // Make coordinator profile visible, even upon failure. |
| if (coord_->query_profile() != nullptr) profile_->AddChild(coord_->query_profile()); |
| |
| bool cancelled = false; |
| Status cancellation_status; |
| { |
| lock_guard<mutex> l(lock_); |
| if (!UpdateQueryStatus(exec_status).ok()) return; |
| // Coordinator::Exec() finished successfully - it is safe to concurrently access |
| // 'coord_'. This thread needs to cancel the coordinator if cancellation occurred |
| // *before* 'coord_' was accessible to other threads. Once the lock is dropped, any |
| // future calls to Cancel() are responsible for calling Coordinator::Cancel(), so |
| // while holding the lock we need to both perform a check for cancellation and make |
| // the coord_ visible. |
| coord_exec_called_.Store(true); |
| cancelled = is_cancelled_; |
| if (cancelled) { |
| VLOG_QUERY << "Cancelled right after starting the coordinator query id=" |
| << PrintId(query_id()); |
| discard_result(UpdateQueryStatus(Status::CANCELLED)); |
| } |
| } |
| |
| if (cancelled) { |
| coord_->Cancel(); |
| return; |
| } |
| UpdateNonErrorExecState(ExecState::RUNNING); |
| } |
| |
| Status ClientRequestState::ExecDdlRequestImplSync() { |
| if (catalog_op_type() != TCatalogOpType::DDL && |
| catalog_op_type() != TCatalogOpType::RESET_METADATA) { |
| Status status = ExecLocalCatalogOp(exec_request().catalog_op_request); |
| lock_guard<mutex> l(lock_); |
| return UpdateQueryStatus(status); |
| } |
| |
| if (ddl_type() == TDdlType::COMPUTE_STATS) { |
| const TComputeStatsParams& compute_stats_params = |
| exec_request().catalog_op_request.ddl_params.compute_stats_params; |
| RuntimeProfile* child_profile = |
| RuntimeProfile::Create(&profile_pool_, "Child Queries"); |
| profile_->AddChild(child_profile); |
| // Add child queries for computing table and column stats. |
| vector<ChildQuery> child_queries; |
| if (compute_stats_params.__isset.tbl_stats_query) { |
| RuntimeProfile* profile = |
| RuntimeProfile::Create(&profile_pool_, "Table Stats Query"); |
| child_profile->AddChild(profile); |
| child_queries.emplace_back(compute_stats_params.tbl_stats_query, this, |
| parent_server_, profile, &profile_pool_); |
| } |
| if (compute_stats_params.__isset.col_stats_query) { |
| RuntimeProfile* profile = |
| RuntimeProfile::Create(&profile_pool_, "Column Stats Query"); |
| child_profile->AddChild(profile); |
| child_queries.emplace_back(compute_stats_params.col_stats_query, this, |
| parent_server_, profile, &profile_pool_); |
| } |
| |
| if (child_queries.size() > 0) { |
| RETURN_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries))); |
| } else { |
| SetResultSet({"No partitions selected for incremental stats update."}); |
| } |
| return Status::OK(); |
| } |
| |
| DCHECK(false) << "Not handled sync exec ddl request."; |
| return Status::OK(); |
| } |
| |
| void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) { |
| bool is_CTAS = (catalog_op_type() == TCatalogOpType::DDL |
| && ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT); |
| const TExecRequest& exec_req = exec_request(); |
| |
| catalog_op_executor_.reset( |
| new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_)); |
| |
| // Indirectly check if running in thread async_exec_thread_. |
| if (exec_in_worker_thread) { |
| VLOG_QUERY << "Running in worker thread"; |
| DCHECK(exec_state() == ExecState::PENDING); |
| |
| // 1. For any non-CTAS DDLs, transition to RUNNING |
| // 2. For CTAS DDLs, transition to RUNNING during FinishExecQueryOrDmlRequest() |
| // called by ExecQueryOrDmlRequest(). |
| if (!is_CTAS) UpdateNonErrorExecState(ExecState::RUNNING); |
| } |
| |
| // Optionally wait with a debug action before Exec() below. |
| DebugActionNoFail(exec_req.query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC"); |
| |
| Status status = catalog_op_executor_->Exec(exec_req.catalog_op_request); |
| query_events_->MarkEvent("CatalogDdlRequest finished"); |
| if (otel_trace_query()) { |
| otel_span_manager_->AddChildSpanEvent("UpdateCatalogFinished"); |
| } |
| AddCatalogTimeline(); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); |
| } |
| |
| // If this is a CTAS request, there will usually be more work to do |
| // after executing the CREATE TABLE statement (the INSERT portion of the operation). |
| // The exception is if the user specified IF NOT EXISTS and the table already |
| // existed, in which case we do not execute the INSERT. |
| if (catalog_op_type() == TCatalogOpType::DDL && |
| ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT && |
| !catalog_op_executor_->ddl_exec_response()->new_table_created) { |
| DCHECK(exec_req.catalog_op_request. |
| ddl_params.create_table_params.if_not_exists); |
| return; |
| } |
| |
| // Add newly created table to catalog cache. |
| status = parent_server_->ProcessCatalogUpdateResult( |
| *catalog_op_executor_->update_catalog_result(), |
| exec_req.query_options.sync_ddl, query_options(), query_events_); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); |
| } |
| |
| if (is_CTAS) { |
| // At this point, the remainder of the CTAS request executes |
| // like a normal DML request. As with other DML requests, it will |
| // wait for another catalog update if any partitions were altered as a result |
| // of the operation. |
| DCHECK(exec_req.__isset.query_exec_request); |
| RETURN_VOID_IF_ERROR( |
| ExecQueryOrDmlRequest(exec_req.query_exec_request, !exec_in_worker_thread)); |
| } |
| |
| // Set the results to be reported to the client. Do this under lock to avoid races |
| // with ImpalaServer::GetResultSetMetadata(). |
| { |
| lock_guard<mutex> l(lock_); |
| SetResultSet(catalog_op_executor_->ddl_exec_response()); |
| } |
| } |
| |
| bool ClientRequestState::ShouldRunExecDdlAsync() { |
| // Local catalog op DDL will run synchronously. |
| if (catalog_op_type() != TCatalogOpType::DDL |
| && catalog_op_type() != TCatalogOpType::RESET_METADATA) { |
| return false; |
| } |
| |
| // The exec DDL part of compute stats will run synchronously. |
| if (ddl_type() == TDdlType::COMPUTE_STATS) return false; |
| |
| return true; |
| } |
| |
| Status ClientRequestState::ExecDdlRequest() { |
| string op_type = catalog_op_type() == TCatalogOpType::DDL ? |
| PrintValue(ddl_type()) : PrintValue(catalog_op_type()); |
| bool async_ddl = ShouldRunExecDdlAsync(); |
| bool async_ddl_enabled = exec_request().query_options.enable_async_ddl_execution; |
| string exec_mode = (async_ddl && async_ddl_enabled) ? "asynchronous" : "synchronous"; |
| |
| summary_profile_->AddInfoString("DDL Type", op_type); |
| summary_profile_->AddInfoString("DDL execution mode", exec_mode); |
| VLOG_QUERY << "DDL exec mode=" << exec_mode; |
| |
| if (!async_ddl) return ExecDdlRequestImplSync(); |
| |
| if (async_ddl_enabled) { |
| // Transition the exec state out of INITIALIZED to PENDING to make available the |
| // runtime profile for the DDL. Later on in ExecDdlRequestImpl(), the state |
| // further transitions to RUNNING. |
| UpdateNonErrorExecState(ExecState::PENDING); |
| return Thread::Create("impala-server", "async_exec_thread_", |
| &ClientRequestState::ExecDdlRequestImpl, this, true /*exec in a worker thread*/, |
| &async_exec_thread_); |
| } else { |
| ExecDdlRequestImpl(false /*exec in the same thread as the caller*/); |
| return query_status_; |
| } |
| } |
| |
| void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) { |
| const TExecRequest& exec_req = exec_request(); |
| if (exec_in_worker_thread) { |
| VLOG_QUERY << "Running in worker thread"; |
| DCHECK(exec_state() == ExecState::PENDING); |
| UpdateNonErrorExecState(ExecState::RUNNING); |
| } |
| DebugActionNoFail( |
| exec_req.query_options, "CRS_DELAY_BEFORE_LOAD_DATA"); |
| |
| TLoadDataResp response; |
| Status status = frontend_->LoadData(exec_req.load_data_request, &response); |
| if (exec_req.load_data_request.iceberg_tbl) { |
| ExecLoadIcebergDataRequestImpl(response); |
| } |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); |
| } |
| |
| request_result_set_.reset(new vector<TResultRow>); |
| request_result_set_->push_back(response.load_summary); |
| |
| // We use TUpdateCatalogRequest to refresh the table metadata so that it will |
| // fire an insert event just like an insert statement. |
| TUpdatedPartition updatedPartition; |
| updatedPartition.files.insert(updatedPartition.files.end(), |
| response.loaded_files.begin(), response.loaded_files.end()); |
| TUpdateCatalogRequest catalog_update; |
| // The partition_name is an empty string for unpartitioned tables. |
| catalog_update.updated_partitions[response.partition_name] = updatedPartition; |
| |
| catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl); |
| catalog_update.__set_header(GetCatalogServiceRequestHeader()); |
| catalog_update.target_table = exec_req.load_data_request.table_name.table_name; |
| catalog_update.db_name = exec_req.load_data_request.table_name.db_name; |
| catalog_update.is_overwrite = exec_req.load_data_request.overwrite; |
| |
| CatalogServiceConnection client(ExecEnv::GetInstance()->catalogd_client_cache(), |
| *ExecEnv::GetInstance()->GetCatalogdAddress().get(), &status); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); |
| } |
| |
| TUpdateCatalogResponse resp; |
| status = client.DoRpc( |
| &CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp); |
| query_events_->MarkEvent("UpdateCatalog finished"); |
| if (resp.__isset.profile) { |
| for (const TEventSequence& catalog_timeline : resp.profile.event_sequences) { |
| summary_profile_->AddEventSequence(catalog_timeline.name, catalog_timeline); |
| } |
| } |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); |
| } |
| |
| status = parent_server_->ProcessCatalogUpdateResult( |
| resp.result, |
| exec_req.query_options.sync_ddl, query_options(), query_events_); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); |
| } |
| } |
| |
| void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response) { |
| TLoadDataReq load_data_req = exec_request().load_data_request; |
| RuntimeProfile* child_profile = |
| RuntimeProfile::Create(&profile_pool_, "Child Queries"); |
| profile_->AddChild(child_profile); |
| // Add child queries for computing table and column stats. |
| vector<ChildQuery> child_queries; |
| // Prepare CREATE |
| RuntimeProfile* create_profile = |
| RuntimeProfile::Create(&profile_pool_, "Create table query"); |
| child_profile->AddChild(create_profile); |
| child_queries.emplace_back(response.create_tmp_tbl_query, this, parent_server_, |
| create_profile, &profile_pool_); |
| // Prepare INSERT |
| RuntimeProfile* insert_profile = |
| RuntimeProfile::Create(&profile_pool_, "Insert query"); |
| child_profile->AddChild(insert_profile); |
| child_queries.emplace_back(load_data_req.insert_into_dst_tbl_query, this, |
| parent_server_, insert_profile, &profile_pool_); |
| // Prepare DROP |
| RuntimeProfile* drop_profile = |
| RuntimeProfile::Create(&profile_pool_, "Drop table query"); |
| child_profile->AddChild(drop_profile); |
| child_queries.emplace_back(load_data_req.drop_tmp_tbl_query, this, |
| parent_server_, drop_profile, &profile_pool_); |
| // Execute queries |
| RETURN_VOID_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries))); |
| vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>(); |
| Status query_status = child_query_executor_->WaitForAll(completed_queries); |
| if (query_status.ok()) { |
| const char* path = response.create_location.c_str(); |
| string delete_err = "Load was succesful, but failed to remove staging data under '" |
| + response.create_location + "', HDFS error: "; |
| hdfsFS hdfs_conn; |
| Status hdfs_ret = HdfsFsCache::instance()->GetConnection(path, &hdfs_conn); |
| if (!hdfs_ret.ok()) { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(delete_err + hdfs_ret.GetDetail()))); |
| } |
| if (hdfsDelete(hdfs_conn, path, 1)) { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(delete_err + strerror(errno)))); |
| } |
| } else { |
| const char* dst_path = load_data_req.source_path.c_str(); |
| hdfsFS hdfs_dst_conn; |
| string revert_err = "Failed to load data and failed to revert data movement, " |
| "please check source and staging directory under '" + response.create_location |
| + "', Query error: " + query_status.GetDetail() + " HDFS error: "; |
| Status hdfs_ret = HdfsFsCache::instance()->GetConnection(dst_path, &hdfs_dst_conn); |
| if (!hdfs_ret.ok()) { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err + hdfs_ret.GetDetail()))); |
| } |
| for (const string& src_path : response.loaded_files) { |
| hdfsFS hdfs_src_conn; |
| hdfs_ret = HdfsFsCache::instance()->GetConnection(dst_path, &hdfs_src_conn); |
| if (!hdfs_ret.ok()) { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err |
| + hdfs_ret.GetDetail()))); |
| } |
| if (hdfsMove(hdfs_src_conn, src_path.c_str(), hdfs_dst_conn, dst_path)) { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err + strerror(errno)))); |
| } |
| } |
| } |
| } |
| |
| |
| Status ClientRequestState::ExecLoadDataRequest() { |
| if (exec_request().query_options.enable_async_load_data_execution) { |
| // Transition the exec state out of INITIALIZED to PENDING to make available the |
| // runtime profile for the DDL. |
| UpdateNonErrorExecState(ExecState::PENDING); |
| return Thread::Create("impala-server", "async_exec_thread_", |
| &ClientRequestState::ExecLoadDataRequestImpl, this, true, &async_exec_thread_); |
| } |
| |
| // sync exection |
| ExecLoadDataRequestImpl(false /* not use a worker thread */); |
| return query_status_; |
| } |
| |
| Status ClientRequestState::ExecShutdownRequest() { |
| const TShutdownParams& request = exec_request().admin_request.shutdown_params; |
| bool backend_port_specified = request.__isset.backend && request.backend.port != 0; |
| int port = backend_port_specified ? request.backend.port : FLAGS_krpc_port; |
| // Use the local shutdown code path if the host is unspecified or if it exactly matches |
| // the configured host/port. This avoids the possibility of RPC errors preventing |
| // shutdown. |
| if (!request.__isset.backend |
| || (request.backend.hostname == FLAGS_hostname && port == FLAGS_krpc_port)) { |
| ShutdownStatusPB shutdown_status; |
| int64_t deadline_s = request.__isset.deadline_s ? request.deadline_s : -1; |
| RETURN_IF_ERROR(parent_server_->StartShutdown(deadline_s, &shutdown_status)); |
| SetResultSet({ImpalaServer::ShutdownStatusToString(shutdown_status)}); |
| return Status::OK(); |
| } |
| |
| // KRPC relies on resolved IP address, so convert hostname. |
| IpAddr ip_address; |
| Status ip_status = HostnameToIpAddr(request.backend.hostname, &ip_address); |
| if (!ip_status.ok()) { |
| VLOG(1) << "Could not convert hostname " << request.backend.hostname |
| << " to ip address, error: " << ip_status.GetDetail(); |
| return ip_status; |
| } |
| // Find BackendId for the given remote ip address and port from cluster membership. |
| // The searching is not efficient, but Shutdown Requests are not called frequently. |
| // The BackendId is used to generate UDS address for Unix domain socket. Leave the |
| // Id value as 0 if it's not found in cluster membership. |
| // Note that UDS is only used when FLAGS_rpc_use_unix_domain_socket is set as true. |
| UniqueIdPB backend_id; |
| backend_id.set_hi(0); |
| backend_id.set_lo(0); |
| if (ExecEnv::GetInstance()->rpc_mgr()->IsKrpcUsingUDS()) { |
| if (ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId() |
| == UdsAddressUniqueIdPB::BACKEND_ID) { |
| ClusterMembershipMgr::SnapshotPtr membership_snapshot = |
| ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot(); |
| DCHECK(membership_snapshot.get() != nullptr); |
| for (const auto& it : membership_snapshot->current_backends) { |
| // Compare resolved IP addresses and ports. |
| if (it.second.ip_address() == ip_address && it.second.address().port() == port) { |
| DCHECK(it.second.has_backend_id()); |
| backend_id = it.second.backend_id(); |
| break; |
| } |
| } |
| } |
| } |
| string krpc_error = "RemoteShutdown() RPC failed: Network error"; |
| string krpc_error2 = "RemoteShutdown() RPC failed: Timed out"; |
| NetworkAddressPB krpc_addr = MakeNetworkAddressPB(ip_address, port, backend_id, |
| ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId()); |
| std::unique_ptr<ControlServiceProxy> proxy; |
| Status get_proxy_status = |
| ControlService::GetProxy(krpc_addr, request.backend.hostname, &proxy); |
| if (!get_proxy_status.ok()) { |
| return Status( |
| Substitute("Could not get Proxy to ControlService at $0 with error: $1.", |
| NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg())); |
| } |
| |
| RemoteShutdownParamsPB params; |
| if (request.__isset.deadline_s) params.set_deadline_s(request.deadline_s); |
| RemoteShutdownResultPB resp; |
| VLOG_QUERY << "Sending Shutdown RPC to " << NetworkAddressPBToString(krpc_addr); |
| |
| const int num_retries = 3; |
| const int64_t timeout_ms = 10 * MILLIS_PER_SEC; |
| const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC; |
| Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::RemoteShutdown, |
| params, &resp, query_ctx_, "RemoteShutdown() RPC failed", num_retries, timeout_ms, |
| backoff_time_ms, "CRS_SHUTDOWN_RPC"); |
| |
| if (!rpc_status.ok()) { |
| const string& msg = rpc_status.msg().msg(); |
| VLOG_QUERY << "RemoteShutdown query_id= " << PrintId(query_id()) |
| << " failed to send RPC to " << NetworkAddressPBToString(krpc_addr) << " :" |
| << msg; |
| string err_string = Substitute( |
| "Rpc to $0 failed with error '$1'", NetworkAddressPBToString(krpc_addr), msg); |
| // Attempt to detect if the the failure is because of not using a KRPC port. |
| if (backend_port_specified && |
| (msg.find(krpc_error) != string::npos || |
| msg.find(krpc_error2) != string::npos)) { |
| // Prior to IMPALA-7985 :shutdown() used the backend port. |
| err_string.append(" This may be because the port specified is wrong. You may have" |
| " specified the backend (thrift) port which :shutdown() can no" |
| " longer use. Please make sure the correct KRPC port is being" |
| " used, or don't specify any port in the :shutdown() command."); |
| } |
| return Status(err_string); |
| } |
| Status shutdown_status(resp.status()); |
| RETURN_IF_ERROR(shutdown_status); |
| SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status())}); |
| return Status::OK(); |
| } |
| |
| Status ClientRequestState::ExecEventProcessorCmd() { |
| catalog_op_executor_.reset( |
| new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_)); |
| const TEventProcessorCmdParams& params = |
| exec_request().admin_request.event_processor_cmd_params; |
| TSetEventProcessorStatusRequest request; |
| TSetEventProcessorStatusResponse response; |
| request.__set_params(params); |
| request.__set_header(GetCatalogServiceRequestHeader()); |
| Status rpc_status = catalog_op_executor_->SetEventProcessorStatus(request, &response); |
| if (!rpc_status.ok()) { |
| VLOG_QUERY << "SetEventProcessorStatus failed: " << rpc_status.msg().msg(); |
| return rpc_status; |
| } |
| SetResultSet({response.info}); |
| return Status::OK(); |
| } |
| |
| void ClientRequestState::Finalize(const Status* cause) { |
| if (otel_trace_query()) { |
| // In a non-error case, end the query execution span since it will be the active span. |
| if (cause == nullptr || cause->ok()) { |
| otel_span_manager_->EndChildSpanQueryExecution(); |
| } |
| |
| // No need to end previous child span in an error case. This function silently closes |
| // the active child span if there is one. |
| otel_span_manager_->StartChildSpanClose(cause); |
| } |
| |
| Cancel(cause, /*wait_until_finalized=*/true); |
| MarkActive(); |
| // Make sure we join on wait_thread_ before we finish (and especially before this object |
| // is destroyed). |
| int64_t block_on_wait_time_us = 0; |
| BlockOnWait(0, &block_on_wait_time_us); |
| DCHECK_EQ(block_on_wait_time_us, 0); |
| |
| // Update latest observed Kudu timestamp stored in the session from the coordinator. |
| // Needs to take the session_ lock which must not be taken while holding lock_, so this |
| // must happen before taking lock_ below. |
| Coordinator* coordinator = GetCoordinator(); |
| if (coordinator != nullptr) { |
| // This is safe to access on coord_ after Wait() has been called. |
| uint64_t latest_kudu_ts = |
| coordinator->dml_exec_state()->GetKuduLatestObservedTimestamp(); |
| if (latest_kudu_ts > 0) { |
| VLOG_RPC << "Updating session (id=" << PrintId(session_id()) << ") with latest " |
| << "observed Kudu timestamp: " << latest_kudu_ts; |
| lock_guard<mutex> session_lock(session_->lock); |
| session_->kudu_latest_observed_ts = std::max<uint64_t>( |
| session_->kudu_latest_observed_ts, latest_kudu_ts); |
| } |
| } |
| |
| // If the transaction didn't get committed by this point then we should just abort it. |
| if (InTransaction()) { |
| AbortTransaction(); |
| } else if (InKuduTransaction()) { |
| AbortKuduTransaction(); |
| } |
| |
| UpdateEndTime(); |
| |
| { |
| unique_lock<mutex> l(lock_); |
| // Update result set cache metrics, and update mem limit accounting before tearing |
| // down the coordinator. |
| ClearResultCache(); |
| } |
| // Wait until the audit events are flushed. |
| if (wait_thread_.get() != nullptr) { |
| wait_thread_->Join(); |
| wait_thread_.reset(); |
| } else { |
| // The query failed in the fe even before a wait thread is launched. Synchronously |
| // flush log events to audit authorization errors, if any. |
| LogQueryEvents(); |
| } |
| DCHECK(wait_thread_.get() == nullptr); |
| |
| // Update the timeline here so that all of the above work is captured in the timeline. |
| query_events_->MarkEvent("Unregister query"); |
| UnRegisterRemainingRPCs(); |
| if (otel_trace_query()) { |
| otel_span_manager_->AddChildSpanEvent("QueryUnregistered"); |
| otel_span_manager_->EndChildSpanClose(); |
| |
| // End the root span and thus the entire trace is also ended. |
| otel_span_manager_.reset(); |
| } |
| } |
| |
| Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) { |
| TResultSet metadata_op_result; |
| // Like the other Exec(), fill out as much profile information as we're able to. |
| summary_profile_->AddInfoString("Query Type", PrintValue(TStmtType::DDL)); |
| RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request, |
| &metadata_op_result)); |
| result_metadata_ = metadata_op_result.schema; |
| request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows)); |
| UpdateNonErrorExecState(ExecState::RUNNING); |
| return Status::OK(); |
| } |
| |
| Status ClientRequestState::WaitAsync() { |
| // TODO: IMPALA-7396: thread creation fault inject is disabled because it is not |
| // handled correctly. |
| return Thread::Create("query-exec-state", "wait-thread", |
| &ClientRequestState::Wait, this, &wait_thread_, false); |
| } |
| |
| bool ClientRequestState::BlockOnWait(int64_t timeout_us, int64_t* block_on_wait_time_us) { |
| DCHECK_GE(timeout_us, 0); |
| unique_lock<mutex> l(lock_); |
| *block_on_wait_time_us = 0; |
| // Some metadata operations like GET_COLUMNS do not rely on WaitAsync() to launch |
| // the wait thread. In such cases this method is expected to be a no-op. |
| if (wait_thread_.get() == nullptr) return true; |
| while (!is_wait_done_) { |
| if (timeout_us == 0) { |
| block_on_wait_cv_.Wait(l); |
| return true; |
| } else { |
| MonotonicStopWatch wait_timeout_timer; |
| wait_timeout_timer.Start(); |
| bool notified = block_on_wait_cv_.WaitFor(l, timeout_us); |
| if (notified) { |
| *block_on_wait_time_us = wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO; |
| } |
| return notified; |
| } |
| } |
| return true; |
| } |
| |
| void ClientRequestState::Wait() { |
| // block until results are ready |
| Status status = WaitInternal(); |
| // Rows are available now (for SELECT statement), so start the 'wait' timer that tracks |
| // how long Impala waits for the client to fetch rows. For other statements, track the |
| // time until a Close() is received. |
| MarkInactive(); |
| { |
| lock_guard<mutex> l(lock_); |
| if (returns_result_set()) { |
| query_events()->MarkEvent("Rows available"); |
| if (LIKELY(status.code() != TErrorCode::CANCELLED) && otel_trace_query()) { |
| otel_span_manager_->AddChildSpanEvent("RowsAvailable"); |
| } |
| } else { |
| query_events()->MarkEvent("Request finished"); |
| UpdateEndTime(); |
| } |
| discard_result(UpdateQueryStatus(status)); |
| } |
| |
| if (status.ok()) { |
| if (stmt_type() == TStmtType::DDL) { |
| DCHECK(catalog_op_type() != TCatalogOpType::DDL || request_result_set_ != nullptr); |
| } |
| // It is possible the query already failed at this point and ExecState is ERROR. In |
| // this case, the call to UpdateNonErrorExecState(FINISHED) does not change the |
| // ExecState. |
| UpdateNonErrorExecState(ExecState::FINISHED); |
| } |
| // UpdateQueryStatus() or UpdateNonErrorExecState() have updated exec_state_. |
| DCHECK(exec_state() == ExecState::FINISHED || exec_state() == ExecState::ERROR |
| || retry_state() == RetryState::RETRYING || retry_state() == RetryState::RETRIED); |
| // Notify all the threads blocked on Wait() to finish and then log the query events, |
| // if any. |
| { |
| unique_lock<mutex> l(lock_); |
| is_wait_done_ = true; |
| } |
| block_on_wait_cv_.NotifyAll(); |
| LogQueryEvents(); |
| } |
| |
| Status ClientRequestState::WaitInternal() { |
| // Explain requests have already populated the result set. Nothing to do here. |
| if (exec_request().stmt_type == TStmtType::EXPLAIN) { |
| return Status::OK(); |
| } |
| |
| // Wait until the query has passed through admission control and is either running or |
| // cancelled or encountered an error. |
| if (async_exec_thread_.get() != nullptr) async_exec_thread_->Join(); |
| |
| vector<ChildQuery*> child_queries; |
| Status child_queries_status = child_query_executor_->WaitForAll(&child_queries); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_IF_ERROR(query_status_); |
| RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status)); |
| } |
| if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished"); |
| |
| bool isCTAS = catalog_op_type() == TCatalogOpType::DDL |
| && ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT; |
| |
| if (GetCoordinator() != NULL) { |
| Status status = GetCoordinator()->Wait(); |
| if (UNLIKELY(!status.ok())) { |
| if (InKuduTransaction()) AbortKuduTransaction(); |
| return status; |
| } |
| RETURN_IF_ERROR(UpdateCatalog()); |
| } else { |
| // When the coordinator is not available for CTAS that requires a coordinator, check |
| // further if the query has been cancelled. If so, return immediately as there will |
| // be no query result available (IMPALA-11006). |
| if (isCTAS) { |
| RETURN_IF_CANCELLED(this); |
| } |
| } |
| |
| if (catalog_op_type() == TCatalogOpType::DDL && |
| ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) { |
| RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries)); |
| } |
| |
| if (!returns_result_set()) { |
| // Queries that do not return a result are finished at this point. This includes |
| // DML operations. |
| eos_.Store(true); |
| } else if (isCTAS) { |
| SetCreateTableAsSelectResultSet(); |
| } |
| return Status::OK(); |
| } |
| |
| Status ClientRequestState::FetchRows(const int32_t max_rows, |
| QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) { |
| // Pause the wait timer, since the client has instructed us to do work on its behalf. |
| MarkActive(); |
| |
| // ImpalaServer::FetchInternal has already taken our lock_ |
| discard_result(UpdateQueryStatus( |
| FetchRowsInternal(max_rows, fetched_rows, block_on_wait_time_us))); |
| |
| MarkInactive(); |
| return query_status_; |
| } |
| |
| Status ClientRequestState::RestartFetch() { |
| // No result caching for this query. Restart is invalid. |
| if (result_cache_max_size_ <= 0) { |
| return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, |
| "Restarting of fetch requires enabling of query result caching.")); |
| } |
| // The cache overflowed on a previous fetch. |
| if (result_cache_.get() == NULL) { |
| stringstream ss; |
| ss << "The query result cache exceeded its limit of " << result_cache_max_size_ |
| << " rows. Restarting the fetch is not possible."; |
| return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str())); |
| } |
| // Reset fetch state to start over. |
| eos_.Store(false); |
| num_rows_fetched_ = 0; |
| return Status::OK(); |
| } |
| |
| void ClientRequestState::UpdateNonErrorExecState(ExecState new_state) { |
| lock_guard<mutex> l(lock_); |
| ExecState old_state = exec_state(); |
| static string error_msg = "Illegal state transition: $0 -> $1, query_id=$2"; |
| switch (new_state) { |
| case ExecState::PENDING: |
| DCHECK(old_state == ExecState::INITIALIZED) |
| << Substitute(error_msg, ExecStateToString(old_state), |
| ExecStateToString(new_state), PrintId(query_id())); |
| UpdateExecState(new_state); |
| break; |
| case ExecState::RUNNING: |
| // It is possible for FinishExecQueryOrDmlRequest() to attempt a transition to |
| // running, even after the query has been cancelled with an error status (and is |
| // thus in the ERROR ExecState). In this case, just ignore the transition attempt. |
| if (old_state != ExecState::ERROR) { |
| // DDL statements and metadata ops don't use the PENDING state, so a query can |
| // transition directly from the INITIALIZED to RUNNING state. |
| DCHECK(old_state == ExecState::INITIALIZED || old_state == ExecState::PENDING) |
| << Substitute(error_msg, ExecStateToString(old_state), |
| ExecStateToString(new_state), PrintId(query_id())); |
| UpdateExecState(new_state); |
| } |
| break; |
| case ExecState::FINISHED: |
| // Only transition to the FINISHED state if the query has not failed. It is not |
| // valid to transition from ERROR to FINISHED, so skip any attempt to do so. |
| if (old_state != ExecState::ERROR) { |
| // A query can transition from PENDING to FINISHED if it is cancelled by the |
| // client. |
| DCHECK(old_state == ExecState::PENDING || old_state == ExecState::RUNNING) |
| << Substitute(error_msg, ExecStateToString(old_state), |
| ExecStateToString(new_state), PrintId(query_id())); |
| UpdateExecState(new_state); |
| } |
| break; |
| default: |
| DCHECK(false) << "A non-error state expected but got: " |
| << ExecStateToString(new_state); |
| } |
| } |
| |
| void ClientRequestState::SetOriginalId(const TUniqueId& original_id) { |
| // Copy the TUniqueId query_id from the original query. |
| original_id_ = make_unique<TUniqueId>(original_id); |
| summary_profile_->AddInfoString("Original Query Id", PrintId(*original_id_)); |
| } |
| |
| void ClientRequestState::MarkAsRetrying(const Status& status) { |
| retry_state_.Store(RetryState::RETRYING); |
| summary_profile_->AddInfoString( |
| RETRY_STATUS_KEY, RetryStateToString(RetryState::RETRYING)); |
| |
| // Set the query status. |
| query_status_ = status; |
| summary_profile_->AddInfoStringRedacted(QUERY_STATUS_KEY, query_status_.GetDetail()); |
| // The Query Status might be overwritten later if the retry fails. "Retry Cause" |
| // preserves the original error that triggered the retry. |
| summary_profile_->AddInfoStringRedacted("Retry Cause", query_status_.GetDetail()); |
| } |
| |
| Status ClientRequestState::UpdateQueryStatus(const Status& status, bool log_error) { |
| // Preserve the first non-ok status |
| if (!status.ok() && query_status_.ok()) { |
| UpdateExecState(ExecState::ERROR); |
| query_status_ = status; |
| summary_profile_->AddInfoStringRedacted(QUERY_STATUS_KEY, query_status_.GetDetail()); |
| if (log_error) VLOG_QUERY << status.GetDetail(); |
| } |
| |
| return status; |
| } |
| |
| Status ClientRequestState::FetchRowsInternal(const int32_t max_rows, |
| QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) { |
| // Wait() guarantees that we've transitioned at least to FINISHED state (and any |
| // state beyond that should have a non-OK query_status_ set). |
| DCHECK(exec_state() == ExecState::FINISHED); |
| |
| if (eos_.Load()) return Status::OK(); |
| |
| if (request_result_set_ != NULL) { |
| int num_rows = 0; |
| const vector<TResultRow>& all_rows = (*(request_result_set_.get())); |
| // max_rows <= 0 means no limit |
| while ((num_rows < max_rows || max_rows <= 0) |
| && num_rows_fetched_ < all_rows.size()) { |
| RETURN_IF_ERROR(fetched_rows->AddOneRow(all_rows[num_rows_fetched_])); |
| ++num_rows_fetched_; |
| ++num_rows; |
| } |
| eos_.Store(num_rows_fetched_ == all_rows.size()); |
| return Status::OK(); |
| } |
| |
| Coordinator* coordinator = GetCoordinator(); |
| if (coordinator == nullptr) { |
| return Status("Client tried to fetch rows on a query that produces no results."); |
| } |
| |
| int32_t num_rows_fetched_from_cache = 0; |
| if (result_cache_max_size_ > 0 && result_cache_ != NULL) { |
| // Satisfy the fetch from the result cache if possible. |
| int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows; |
| num_rows_fetched_from_cache = |
| fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size); |
| num_rows_fetched_ += num_rows_fetched_from_cache; |
| COUNTER_ADD(num_rows_fetched_from_cache_counter_, num_rows_fetched_from_cache); |
| if (num_rows_fetched_from_cache >= max_rows) return Status::OK(); |
| } |
| |
| // Maximum number of rows to be fetched from the coord. |
| int32_t max_coord_rows = max_rows; |
| if (max_rows > 0) { |
| DCHECK_LE(num_rows_fetched_from_cache, max_rows); |
| max_coord_rows = max_rows - num_rows_fetched_from_cache; |
| } |
| { |
| SCOPED_TIMER(row_materialization_timer_); |
| size_t before = fetched_rows->size(); |
| bool eos = false; |
| |
| // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_ |
| // (already held) ensures that we do not call coord_->GetNext() multiple times |
| // concurrently. |
| // TODO: Simplify this. |
| lock_.unlock(); |
| Status status = |
| coordinator->GetNext(fetched_rows, max_coord_rows, &eos, block_on_wait_time_us); |
| lock_.lock(); |
| |
| if (eos) eos_.Store(true); |
| |
| int num_fetched = fetched_rows->size() - before; |
| DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute( |
| "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows); |
| num_rows_fetched_ += num_fetched; |
| COUNTER_ADD(num_rows_fetched_counter_, num_fetched); |
| |
| RETURN_IF_ERROR(status); |
| // Check if query status has changed during GetNext() call |
| if (!query_status_.ok()) { |
| eos_.Store(true); |
| return query_status_; |
| } |
| } |
| |
| // Update the result cache if necessary. |
| if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) { |
| int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache; |
| if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) { |
| // Set the cache to NULL to indicate that adding the rows fetched from the coord |
| // would exceed the bound of the cache, and therefore, RestartFetch() should fail. |
| ClearResultCache(); |
| return Status::OK(); |
| } |
| |
| // We guess the size of the cache after adding fetched_rows by looking at the size of |
| // fetched_rows itself, and using this estimate to confirm that the memtracker will |
| // allow us to use this much extra memory. In fact, this might be an overestimate, as |
| // the size of two result sets combined into one is not always the size of both result |
| // sets added together (the best example is the null bitset for each column: it might |
| // have only one entry in each result set, and as a result consume two bytes, but when |
| // the result sets are combined, only one byte is needed). Therefore after we add the |
| // new result set into the cache, we need to fix up the memory consumption to the |
| // actual levels to ensure we don't 'leak' bytes that we aren't using. |
| int64_t before = result_cache_->ByteSize(); |
| |
| // Upper-bound on memory required to add fetched_rows to the cache. |
| int64_t delta_bytes = |
| fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size()); |
| MemTracker* query_mem_tracker = coordinator->query_mem_tracker(); |
| // Count the cached rows towards the mem limit. |
| if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) { |
| string details("Failed to allocate memory for result cache."); |
| return query_mem_tracker->MemLimitExceeded(nullptr, details, delta_bytes); |
| } |
| // Append all rows fetched from the coordinator into the cache. |
| int num_rows_added = result_cache_->AddRows( |
| fetched_rows, num_rows_fetched_from_cache, fetched_rows->size()); |
| |
| int64_t after = result_cache_->ByteSize(); |
| |
| // Confirm that this was not an underestimate of the memory required. |
| DCHECK_GE(before + delta_bytes, after) |
| << "Combined result sets consume more memory than both individually " |
| << Substitute("(before: $0, delta_bytes: $1, after: $2)", |
| before, delta_bytes, after); |
| |
| // Fix up the tracked values |
| if (before + delta_bytes > after) { |
| query_mem_tracker->Release(before + delta_bytes - after); |
| delta_bytes = after - before; |
| } |
| |
| // Update result set cache metrics. |
| ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added); |
| ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void ClientRequestState::Cancel(const Status* cause, bool wait_until_finalized) { |
| // If planning is not done, attempt to cancel query in the frontend. |
| if (!is_planning_done_.load()) { |
| Status status = frontend_->CancelExecRequest(query_id()); |
| if (!status.ok()) { |
| LOG(ERROR) << "Error cancelling planning for query " << PrintId(query_id()) |
| << ": " << status; |
| } |
| } |
| |
| // Clean up completed RPCs before cancelling backends. |
| UnRegisterCompletedRPCs(); |
| |
| { |
| lock_guard<mutex> lock(lock_); |
| // If the query has reached a terminal state, no need to update the state. |
| bool already_done = eos_.Load() || exec_state() == ExecState::ERROR; |
| if (!already_done && cause != NULL) { |
| DCHECK(!cause->ok()); |
| discard_result(UpdateQueryStatus(*cause)); |
| query_events_->MarkEvent("Cancelled"); |
| DCHECK(exec_state() == ExecState::ERROR |
| || retry_state() == RetryState::RETRYING); |
| } |
| |
| // To avoid recalling RemoteAdmissionControlClient::CancelAdmission() since it will |
| // send extra RPC. |
| if (!is_cancelled_) { |
| admission_control_client_->CancelAdmission(); |
| is_cancelled_ = true; |
| } |
| } // Release lock_ before doing cancellation work. |
| |
| // Cancel and close child queries before cancelling parent. 'lock_' should not be held |
| // because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation |
| // involves RPCs and can take quite some time. |
| child_query_executor_->Cancel(); |
| |
| // Ensure the parent query is cancelled if execution has started (if the query was not |
| // started, cancellation is handled by the 'async-exec-thread' thread). 'lock_' should |
| // not be held because cancellation involves RPCs and can block for a long time. |
| if (GetCoordinator() != nullptr) GetCoordinator()->Cancel(wait_until_finalized); |
| } |
| |
| Status ClientRequestState::UpdateCatalog() { |
| const TExecRequest& exec_req = exec_request(); |
| if (!exec_req.__isset.query_exec_request || |
| exec_req.query_exec_request.stmt_type != TStmtType::DML) { |
| return Status::OK(); |
| } |
| |
| query_events_->MarkEvent("DML data written"); |
| SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer")); |
| |
| const TQueryExecRequest& query_exec_request = exec_req.query_exec_request; |
| if (query_exec_request.__isset.finalize_params) { |
| const TFinalizeParams& finalize_params = query_exec_request.finalize_params; |
| TUpdateCatalogRequest catalog_update; |
| catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl); |
| catalog_update.__set_header(GetCatalogServiceRequestHeader()); |
| if (exec_req.query_options.__isset.debug_action) { |
| catalog_update.__set_debug_action(exec_req.query_options.debug_action); |
| } |
| DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state(); |
| if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update, finalize_params)) { |
| VLOG_QUERY << "No partitions altered, not updating metastore (query id: " |
| << PrintId(query_id()) << ")"; |
| } else { |
| // TODO: We track partitions written to, not created, which means |
| // that we do more work than is necessary, because written-to |
| // partitions don't always require a metastore change. |
| if (VLOG_IS_ON(1)) { |
| vector<string> part_list; |
| for (auto it : catalog_update.updated_partitions) part_list.push_back(it.first); |
| VLOG_QUERY << "Updating metastore with " |
| << catalog_update.updated_partitions.size() |
| << " altered partitions (" |
| << join (part_list, ", ") << ")"; |
| } |
| |
| catalog_update.target_table = finalize_params.table_name; |
| catalog_update.db_name = finalize_params.table_db; |
| catalog_update.is_overwrite = finalize_params.is_overwrite; |
| if (InTransaction()) { |
| catalog_update.__set_transaction_id(finalize_params.transaction_id); |
| catalog_update.__set_write_id(finalize_params.write_id); |
| } |
| if (finalize_params.__isset.iceberg_params) { |
| TIcebergOperationParam& cat_ice_op = catalog_update.iceberg_operation; |
| catalog_update.__isset.iceberg_operation = true; |
| if (!CreateIcebergCatalogOps(finalize_params, &cat_ice_op)) { |
| VLOG_QUERY << "No Iceberg partitions altered, not updating metastore " |
| << "(query id: " << PrintId(query_id()) << ")"; |
| return Status::OK(); |
| } |
| } |
| |
| Status cnxn_status; |
| CatalogServiceConnection client(ExecEnv::GetInstance()->catalogd_client_cache(), |
| *ExecEnv::GetInstance()->GetCatalogdAddress().get(), &cnxn_status); |
| RETURN_IF_ERROR(cnxn_status); |
| |
| VLOG_QUERY << "Executing FinalizeDml() using CatalogService"; |
| TUpdateCatalogResponse resp; |
| Status status = DebugAction(query_options(), "CLIENT_REQUEST_UPDATE_CATALOG"); |
| if (status.ok()) { |
| status = client.DoRpc( |
| &CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp); |
| query_events_->MarkEvent("UpdateCatalog finished"); |
| } |
| if (resp.__isset.profile) { |
| for (const TEventSequence& catalog_timeline : resp.profile.event_sequences) { |
| string timeline_name = catalog_timeline.name; |
| // For CTAS, we already have a timeline for the CreateTable execution. |
| // Use another name for the INSERT timeline. |
| if (summary_profile_->GetEventSequence(timeline_name) != nullptr) { |
| timeline_name += " 2"; |
| } |
| summary_profile_->AddEventSequence(timeline_name, catalog_timeline); |
| } |
| } |
| if (status.ok()) status = Status(resp.result.status); |
| if (!status.ok()) { |
| if (InTransaction()) AbortTransaction(); |
| LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail(); |
| return status; |
| } |
| if (InTransaction()) { |
| // UpdateCatalog() succeeded and already committed the transaction for us. |
| int64_t txn_id = GetTransactionId(); |
| if (!frontend_->UnregisterTransaction(txn_id).ok()) { |
| LOG(ERROR) << Substitute("Failed to unregister transaction $0", txn_id); |
| } |
| ClearTransactionState(); |
| query_events_->MarkEvent("Transaction committed"); |
| } |
| RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result, |
| exec_req.query_options.sync_ddl, query_options(), query_events_)); |
| } |
| } else if (InKuduTransaction()) { |
| // Commit the Kudu transaction. Clear transaction state if it's successful. |
| // Otherwise, abort the Kudu transaction and clear transaction state. |
| // Note that TQueryExecRequest.finalize_params is not set for inserting rows to Kudu |
| // table. |
| Status status = CommitKuduTransaction(); |
| if (UNLIKELY(!status.ok())) { |
| AbortKuduTransaction(); |
| LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail(); |
| return status; |
| } |
| } |
| query_events_->MarkEvent("DML Metastore update finished"); |
| if (otel_trace_query()) { |
| otel_span_manager_->AddChildSpanEvent("MetastoreUpdateFinished"); |
| } |
| return Status::OK(); |
| } |
| |
| bool ClientRequestState::CreateIcebergCatalogOps( |
| const TFinalizeParams& finalize_params, TIcebergOperationParam* cat_ice_op) { |
| DCHECK(cat_ice_op != nullptr); |
| const TIcebergDmlFinalizeParams& ice_finalize_params = finalize_params.iceberg_params; |
| DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state(); |
| bool update_catalog = true; |
| cat_ice_op->__set_operation(ice_finalize_params.operation); |
| cat_ice_op->__set_initial_snapshot_id( |
| ice_finalize_params.initial_snapshot_id); |
| cat_ice_op->__set_spec_id(ice_finalize_params.spec_id); |
| if (ice_finalize_params.operation == TIcebergOperation::INSERT) { |
| cat_ice_op->__set_iceberg_data_files_fb( |
| dml_exec_state->CreateIcebergDataFilesVector()); |
| cat_ice_op->__set_is_overwrite(finalize_params.is_overwrite); |
| if (cat_ice_op->iceberg_data_files_fb.empty()) update_catalog = false; |
| } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) { |
| cat_ice_op->__set_iceberg_delete_files_fb( |
| dml_exec_state->CreateIcebergDeleteFilesVector()); |
| cat_ice_op->__set_data_files_referenced_by_position_deletes( |
| dml_exec_state->DataFilesReferencedByPositionDeletes()); |
| if (cat_ice_op->iceberg_delete_files_fb.empty()) update_catalog = false; |
| } else if (ice_finalize_params.operation == TIcebergOperation::UPDATE) { |
| cat_ice_op->__set_iceberg_data_files_fb( |
| dml_exec_state->CreateIcebergDataFilesVector()); |
| cat_ice_op->__set_iceberg_delete_files_fb( |
| dml_exec_state->CreateIcebergDeleteFilesVector()); |
| cat_ice_op->__set_data_files_referenced_by_position_deletes( |
| dml_exec_state->DataFilesReferencedByPositionDeletes()); |
| if (cat_ice_op->iceberg_delete_files_fb.empty()) { |
| DCHECK(cat_ice_op->iceberg_data_files_fb.empty()); |
| update_catalog = false; |
| } |
| } else if (ice_finalize_params.operation == TIcebergOperation::OPTIMIZE) { |
| DCHECK(ice_finalize_params.__isset.optimize_params); |
| const TIcebergOptimizeParams& optimize_params = ice_finalize_params.optimize_params; |
| if (optimize_params.mode == TIcebergOptimizationMode::NOOP) { |
| update_catalog = false; |
| } else { |
| cat_ice_op->__set_iceberg_data_files_fb( |
| dml_exec_state->CreateIcebergDataFilesVector()); |
| if (optimize_params.mode == TIcebergOptimizationMode::PARTIAL) { |
| DCHECK(optimize_params.__isset.selected_data_files_without_deletes); |
| cat_ice_op->__set_replaced_data_files_without_deletes( |
| optimize_params.selected_data_files_without_deletes); |
| } |
| } |
| } else if (ice_finalize_params.operation == TIcebergOperation::MERGE) { |
| cat_ice_op->__set_iceberg_data_files_fb( |
| dml_exec_state->CreateIcebergDataFilesVector()); |
| cat_ice_op->__set_iceberg_delete_files_fb( |
| dml_exec_state->CreateIcebergDeleteFilesVector()); |
| cat_ice_op->__set_data_files_referenced_by_position_deletes( |
| dml_exec_state->DataFilesReferencedByPositionDeletes()); |
| if (cat_ice_op->iceberg_delete_files_fb.empty() |
| && cat_ice_op->iceberg_data_files_fb.empty()) { |
| update_catalog = false; |
| } |
| } |
| if (!update_catalog) query_events_->MarkEvent("No-op Iceberg DML statement"); |
| return update_catalog; |
| } |
| |
| void ClientRequestState::SetResultSet(const TDdlExecResponse* ddl_resp) { |
| if (ddl_resp != NULL && ddl_resp->__isset.result_set) { |
| result_metadata_ = ddl_resp->result_set.schema; |
| request_result_set_.reset(new vector<TResultRow>(ddl_resp->result_set.rows)); |
| } |
| } |
| |
| void ClientRequestState::SetResultSet(const vector<string>& results) { |
| request_result_set_.reset(new vector<TResultRow>); |
| request_result_set_->resize(results.size()); |
| for (int i = 0; i < results.size(); ++i) { |
| (*request_result_set_.get())[i].__isset.colVals = true; |
| (*request_result_set_.get())[i].colVals.resize(1); |
| (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]); |
| } |
| } |
| |
| void ClientRequestState::SetResultSet(const vector<string>& col1, |
| const vector<string>& col2) { |
| DCHECK_EQ(col1.size(), col2.size()); |
| |
| request_result_set_.reset(new vector<TResultRow>); |
| request_result_set_->resize(col1.size()); |
| for (int i = 0; i < col1.size(); ++i) { |
| (*request_result_set_.get())[i].__isset.colVals = true; |
| (*request_result_set_.get())[i].colVals.resize(2); |
| (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); |
| (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); |
| } |
| } |
| |
| void ClientRequestState::SetResultSet(const vector<string>& col1, |
| const vector<string>& col2, const vector<string>& col3) { |
| DCHECK_EQ(col1.size(), col2.size()); |
| DCHECK_EQ(col1.size(), col3.size()); |
| |
| request_result_set_.reset(new vector<TResultRow>); |
| request_result_set_->resize(col1.size()); |
| for (int i = 0; i < col1.size(); ++i) { |
| (*request_result_set_.get())[i].__isset.colVals = true; |
| (*request_result_set_.get())[i].colVals.resize(3); |
| (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); |
| (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); |
| (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]); |
| } |
| } |
| |
| void ClientRequestState::SetResultSet(const vector<string>& col1, |
| const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) { |
| DCHECK_EQ(col1.size(), col2.size()); |
| DCHECK_EQ(col1.size(), col3.size()); |
| DCHECK_EQ(col1.size(), col4.size()); |
| |
| request_result_set_.reset(new vector<TResultRow>); |
| request_result_set_->resize(col1.size()); |
| for (int i = 0; i < col1.size(); ++i) { |
| (*request_result_set_.get())[i].__isset.colVals = true; |
| (*request_result_set_.get())[i].colVals.resize(4); |
| (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); |
| (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); |
| (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]); |
| (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]); |
| } |
| } |
| |
| void ClientRequestState::SetCreateTableAsSelectResultSet() { |
| DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT); |
| int64_t total_num_rows_inserted = 0; |
| // There will only be rows inserted in the case a new table was created as part of this |
| // operation. |
| if (catalog_op_executor_->ddl_exec_response()->new_table_created) { |
| DCHECK(GetCoordinator()); |
| total_num_rows_inserted = GetCoordinator()->dml_exec_state()->GetNumModifiedRows(); |
| } |
| const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted); |
| VLOG_QUERY << summary_msg; |
| vector<string> results(1, summary_msg); |
| SetResultSet(results); |
| } |
| |
| void ClientRequestState::MarkInactive() { |
| client_wait_sw_.Start(); |
| lock_guard<mutex> l(expiration_data_lock_); |
| last_active_time_ms_ = UnixMillis(); |
| DCHECK(ref_count_ > 0) << "Invalid MarkInactive()"; |
| --ref_count_; |
| } |
| |
| void ClientRequestState::MarkActive() { |
| client_wait_sw_.Stop(); |
| int64_t elapsed_time = client_wait_sw_.ElapsedTime(); |
| // If we have reached eos, then the query is already complete, |
| // and we should not accumulate more client wait time. This mostly |
| // impacts the finalization step, where the client is closing the |
| // query and does not need any more rows. Fetching may have already |
| // completed prior to this point, so finalization time should not |
| // count in that case. If the fetch was incomplete, then the client |
| // time should be counted for finalization as well. |
| if (!eos()) { |
| client_wait_timer_->Set(elapsed_time); |
| // The first call is before any MarkInactive() call has run and produces |
| // a zero-length sample. Skip this zero-length sample (but not any later |
| // zero-length samples). |
| if (elapsed_time != 0 || last_client_wait_time_ != 0) { |
| int64_t current_wait_time = elapsed_time - last_client_wait_time_; |
| client_wait_time_stats_->UpdateCounter(current_wait_time); |
| } |
| last_client_wait_time_ = elapsed_time; |
| } |
| lock_guard<mutex> l(expiration_data_lock_); |
| last_active_time_ms_ = UnixMillis(); |
| ++ref_count_; |
| } |
| |
| // Used by RETURN_IF_CANCELLED. |
| bool ClientRequestState::is_cancelled() { |
| lock_guard<mutex> l(lock_); |
| return is_cancelled_; |
| } |
| |
| std::optional<long> getIcebergSnapshotId(const TExecRequest& exec_req) { |
| DCHECK(exec_req.__isset.catalog_op_request); |
| DCHECK(exec_req.catalog_op_request.__isset.ddl_params); |
| DCHECK(exec_req.catalog_op_request.ddl_params.__isset.compute_stats_params); |
| |
| const TComputeStatsParams& compute_stats_params = |
| exec_req.catalog_op_request.ddl_params.compute_stats_params; |
| if (compute_stats_params.__isset.iceberg_snapshot_id) { |
| return std::optional<long>(compute_stats_params.iceberg_snapshot_id); |
| } else { |
| return {}; |
| } |
| } |
| |
| Status ClientRequestState::UpdateTableAndColumnStats( |
| const vector<ChildQuery*>& child_queries) { |
| DCHECK_GE(child_queries.size(), 1); |
| DCHECK_LE(child_queries.size(), 2); |
| catalog_op_executor_.reset( |
| new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_)); |
| |
| // If there was no column stats query, pass in empty thrift structures to |
| // ExecComputeStats(). Otherwise pass in the column stats result. |
| TTableSchema col_stats_schema; |
| TRowSet col_stats_data; |
| if (child_queries.size() > 1) { |
| col_stats_schema = child_queries[1]->result_schema(); |
| col_stats_data = child_queries[1]->result_data(); |
| } |
| |
| const TExecRequest& exec_req = exec_request(); |
| std::optional<long> snapshot_id = getIcebergSnapshotId(exec_req); |
| Status status = catalog_op_executor_->ExecComputeStats( |
| GetCatalogServiceRequestHeader(), |
| exec_req.catalog_op_request, |
| child_queries[0]->result_schema(), |
| child_queries[0]->result_data(), |
| col_stats_schema, |
| col_stats_data, |
| snapshot_id); |
| AddCatalogTimeline(); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_IF_ERROR(UpdateQueryStatus(status)); |
| } |
| RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( |
| *catalog_op_executor_->update_catalog_result(), |
| exec_req.query_options.sync_ddl, query_options(), query_events_)); |
| |
| // Set the results to be reported to the client. |
| SetResultSet(catalog_op_executor_->ddl_exec_response()); |
| query_events_->MarkEvent("Metastore update finished"); |
| return Status::OK(); |
| } |
| |
| void ClientRequestState::ClearResultCache() { |
| if (result_cache_ == nullptr) return; |
| // Update result set cache metrics and mem limit accounting. |
| ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size()); |
| int64_t total_bytes = result_cache_->ByteSize(); |
| ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes); |
| Coordinator* coordinator = GetCoordinator(); |
| if (coordinator != nullptr) { |
| DCHECK(coordinator->query_mem_tracker() != nullptr); |
| coordinator->query_mem_tracker()->Release(total_bytes); |
| } |
| result_cache_.reset(); |
| } |
| |
| void ClientRequestState::UpdateExecState(ExecState exec_state) { |
| { |
| lock_guard<mutex> l(exec_state_lock_); |
| exec_state_.Store(exec_state); |
| summary_profile_->AddInfoString("Query State", PrintValue(BeeswaxQueryState())); |
| summary_profile_->AddInfoString("Impala Query State", ExecStateToString(exec_state)); |
| } |
| // Drop exec_state_lock_ before signalling |
| exec_state_cv_.NotifyAll(); |
| } |
| |
| void ClientRequestState::WaitForCompletionExecState() { |
| if (query_options().long_polling_time_ms <= 0) return; |
| int64_t timeout_us = query_options().long_polling_time_ms * MICROS_PER_MILLI; |
| unique_lock<mutex> l(exec_state_lock_); |
| timespec deadline; |
| TimeFromNowMicros(timeout_us, &deadline); |
| bool timed_out = false; |
| while (exec_state() != ExecState::FINISHED && |
| exec_state() != ExecState::ERROR && |
| !timed_out) { |
| timed_out = !exec_state_cv_.WaitUntil(l, deadline); |
| } |
| } |
| |
| TOperationState::type ClientRequestState::TOperationState() const { |
| switch (exec_state()) { |
| case ExecState::INITIALIZED: return TOperationState::INITIALIZED_STATE; |
| case ExecState::PENDING: return TOperationState::PENDING_STATE; |
| case ExecState::RUNNING: return TOperationState::RUNNING_STATE; |
| case ExecState::FINISHED: return TOperationState::FINISHED_STATE; |
| case ExecState::ERROR: return TOperationState::ERROR_STATE; |
| default: { |
| DCHECK(false) << "Add explicit translation for all used ExecState values"; |
| return TOperationState::ERROR_STATE; |
| } |
| } |
| } |
| |
| beeswax::QueryState::type ClientRequestState::BeeswaxQueryState() const { |
| switch (exec_state()) { |
| case ExecState::INITIALIZED: return beeswax::QueryState::CREATED; |
| case ExecState::PENDING: return beeswax::QueryState::COMPILED; |
| case ExecState::RUNNING: return beeswax::QueryState::RUNNING; |
| case ExecState::FINISHED: return beeswax::QueryState::FINISHED; |
| case ExecState::ERROR: return beeswax::QueryState::EXCEPTION; |
| default: { |
| DCHECK(false) << "Add explicit translation for all used ExecState values"; |
| return beeswax::QueryState::EXCEPTION; |
| } |
| } |
| } |
| |
| // It is safe to use 'coord_' directly for the following two methods since they are safe |
| // to call concurrently with Coordinator::Exec(). See comments for 'coord_' and |
| // 'coord_exec_called_' for more details. |
| Status ClientRequestState::UpdateBackendExecStatus( |
| const ReportExecStatusRequestPB& request, |
| const TRuntimeProfileForest& thrift_profiles) { |
| DCHECK(coord_.get()); |
| return coord_->UpdateBackendExecStatus(request, thrift_profiles); |
| } |
| |
| void ClientRequestState::UpdateFilter( |
| const UpdateFilterParamsPB& params, RpcContext* context) { |
| DCHECK(coord_.get()); |
| coord_->UpdateFilter(params, context); |
| } |
| |
| bool ClientRequestState::GetDmlStats(TDmlResult* dml_result, Status* query_status) { |
| lock_guard<mutex> l(lock_); |
| *query_status = query_status_; |
| if (!query_status->ok()) return false; |
| // Coord may be NULL for a SELECT with LIMIT 0. |
| // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might |
| // need to revisit this, since that might lead us to insert a row without a |
| // coordinator, depending on how we choose to drive the table sink. |
| Coordinator* coord = GetCoordinator(); |
| if (coord == nullptr) return false; |
| coord->dml_exec_state()->ToTDmlResult(dml_result); |
| return true; |
| } |
| |
| void ClientRequestState::WaitUntilRetried() { |
| unique_lock<mutex> l(lock_); |
| DCHECK(retry_state() != RetryState::NOT_RETRIED); |
| while (retry_state() == RetryState::RETRYING) { |
| block_until_retried_cv_.Wait(l); |
| } |
| DCHECK(retry_state() == RetryState::RETRIED |
| || exec_state() == ExecState::ERROR); |
| } |
| |
| void ClientRequestState::MarkAsRetried(const TUniqueId& retried_id) { |
| DCHECK(retry_state() == RetryState::RETRYING) |
| << Substitute("Illegal retry state transition: $0 -> RETRYING, query_id=$2", |
| RetryStateToString(retry_state()), PrintId(query_id())); |
| retry_state_.Store(RetryState::RETRIED); |
| summary_profile_->AddInfoStringRedacted( |
| RETRY_STATUS_KEY, RetryStateToString(RetryState::RETRIED)); |
| summary_profile_->AddInfoString("Retried Query Id", PrintId(retried_id)); |
| UpdateExecState(ExecState::ERROR); |
| block_until_retried_cv_.NotifyOne(); |
| retried_id_ = make_unique<TUniqueId>(retried_id); |
| } |
| |
| const string& ClientRequestState::effective_user() const { |
| return GetEffectiveUser(query_ctx_.session); |
| } |
| |
| void ClientRequestState::UpdateEndTime() { |
| // Update the query's end time only if it isn't set previously. |
| if (end_time_us_.CompareAndSwap(0, UnixMicros())) { |
| // Certain API clients expect Start Time and End Time to be date-time strings |
| // of nanosecond precision, so we explicitly specify the precision here. |
| summary_profile_->AddInfoString( |
| "End Time", ToStringFromUnixMicros(end_time_us(), TimePrecision::Nanosecond)); |
| int64_t duration = end_time_us() - start_time_us(); |
| summary_profile_->AddInfoString("Duration", Substitute("$0 ($1 us)", |
| PrettyPrinter::Print(duration, TUnit::TIME_US), duration)); |
| } |
| } |
| |
| int64_t ClientRequestState::GetTransactionId() const { |
| DCHECK(InTransaction()); |
| return exec_request().query_exec_request.finalize_params.transaction_id; |
| } |
| |
| bool ClientRequestState::InTransaction() const { |
| return exec_request().query_exec_request.finalize_params.__isset.transaction_id && |
| !transaction_closed_; |
| } |
| |
| void ClientRequestState::AbortTransaction() { |
| DCHECK(InTransaction()); |
| if (frontend_->AbortTransaction(GetTransactionId()).ok()) { |
| query_events_->MarkEvent("Transaction aborted"); |
| } else { |
| VLOG(1) << Substitute("Unable to abort transaction with id: $0", GetTransactionId()); |
| } |
| ClearTransactionState(); |
| } |
| |
| void ClientRequestState::ClearTransactionState() { |
| DCHECK(InTransaction()); |
| transaction_closed_ = true; |
| } |
| |
| bool ClientRequestState::InKuduTransaction() const { |
| // If Kudu transaction is opened, TQueryExecRequest.query_ctx.is_kudu_transactional |
| // is set as true by Frontend.doCreateExecRequest(). |
| return (exec_request().query_exec_request.query_ctx.is_kudu_transactional |
| && !transaction_closed_); |
| } |
| |
| void ClientRequestState::AbortKuduTransaction() { |
| DCHECK(InKuduTransaction()); |
| if (frontend_->AbortKuduTransaction(query_ctx_.query_id).ok()) { |
| query_events_->MarkEvent("Kudu transaction aborted"); |
| } else { |
| VLOG(1) << Substitute("Unable to abort Kudu transaction with query-id: $0", |
| PrintId(query_ctx_.query_id)); |
| } |
| transaction_closed_ = true; |
| } |
| |
| Status ClientRequestState::CommitKuduTransaction() { |
| DCHECK(InKuduTransaction()); |
| // Skip calling Commit() for Kudu Transaction with a debug action so that test code |
| // could explicitly control over calling Commit(). |
| Status status = DebugAction(exec_request().query_options, "CRS_NOT_COMMIT_KUDU_TXN"); |
| if (UNLIKELY(!status.ok())) { |
| VLOG(1) << Substitute("Skip to commit Kudu transaction with query-id: $0", |
| PrintId(query_ctx_.query_id)); |
| transaction_closed_ = true; |
| return Status::OK(); |
| } |
| |
| status = frontend_->CommitKuduTransaction(query_ctx_.query_id); |
| if (status.ok()) { |
| query_events_->MarkEvent("Kudu transaction committed"); |
| transaction_closed_ = true; |
| } else { |
| VLOG(1) << Substitute("Unable to commit Kudu transaction with query-id: $0", |
| PrintId(query_ctx_.query_id)); |
| } |
| return status; |
| } |
| |
| void ClientRequestState::LogQueryEvents() { |
| // Wait until the results are available. This guarantees the completion of non QUERY |
| // statements like DDL/DML etc. Query events are logged if the query reaches a FINISHED |
| // state. For certain query types, events are logged regardless of the query state. |
| Status status; |
| { |
| lock_guard<mutex> l(lock_); |
| status = query_status(); |
| } |
| bool log_events = true; |
| switch (stmt_type()) { |
| case TStmtType::QUERY: |
| case TStmtType::DML: |
| case TStmtType::DDL: |
| case TStmtType::UNKNOWN: |
| log_events = status.ok(); |
| break; |
| case TStmtType::EXPLAIN: |
| case TStmtType::LOAD: |
| case TStmtType::SET: |
| case TStmtType::ADMIN_FN: |
| default: |
| break; |
| } |
| |
| // Log audit events that are due to an AuthorizationException. |
| if (parent_server_->IsAuditEventLoggingEnabled() && |
| (Frontend::IsAuthorizationError(status) || log_events)) { |
| // TODO: deal with an error status |
| discard_result(LogAuditRecord(status)); |
| } |
| |
| if (log_events && (parent_server_->AreQueryHooksEnabled() || |
| parent_server_->IsLineageLoggingEnabled())) { |
| // TODO: deal with an error status |
| discard_result(LogLineageRecord()); |
| } |
| } |
| |
| Status ClientRequestState::LogAuditRecord(const Status& query_status) { |
| const TExecRequest& request = exec_request(); |
| stringstream ss; |
| rapidjson::StringBuffer buffer; |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); |
| |
| writer.StartObject(); |
| // Each log entry is a timestamp mapped to a JSON object |
| ss << UnixMillis(); |
| writer.String(ss.str().c_str()); |
| writer.StartObject(); |
| writer.String("query_id"); |
| writer.String(PrintId(query_id()).c_str()); |
| writer.String("session_id"); |
| writer.String(PrintId(session_id()).c_str()); |
| writer.String("start_time"); |
| writer.String(ToStringFromUnixMicros(start_time_us()).c_str()); |
| writer.String("authorization_failure"); |
| writer.Bool(Frontend::IsAuthorizationError(query_status)); |
| writer.String("status"); |
| writer.String(query_status.GetDetail().c_str()); |
| writer.String("user"); |
| writer.String(effective_user().c_str()); |
| writer.String("impersonator"); |
| if (do_as_user().empty()) { |
| // If there is no do_as_user() is empty, the "impersonator" field should be Null. |
| writer.Null(); |
| } else { |
| // Otherwise, the delegator is the current connected user. |
| writer.String(connected_user().c_str()); |
| } |
| writer.String("statement_type"); |
| if (request.stmt_type == TStmtType::DDL) { |
| if (request.catalog_op_request.op_type == TCatalogOpType::DDL) { |
| writer.String(PrintValue(request.catalog_op_request.ddl_params.ddl_type).c_str()); |
| } else { |
| writer.String(PrintValue(request.catalog_op_request.op_type).c_str()); |
| } |
| } else { |
| writer.String(PrintValue(request.stmt_type).c_str()); |
| } |
| writer.String("network_address"); |
| writer.String(TNetworkAddressToString( |
| session()->network_address).c_str()); |
| writer.String("sql_statement"); |
| string stmt = replace_all_copy(sql_stmt(), "\n", " "); |
| Redact(&stmt); |
| writer.String(stmt.c_str()); |
| writer.String("catalog_objects"); |
| |
| writer.StartArray(); |
| for (const TAccessEvent& event: request.access_events) { |
| writer.StartObject(); |
| writer.String("name"); |
| writer.String(event.name.c_str()); |
| writer.String("object_type"); |
| writer.String(PrintValue(event.object_type).c_str()); |
| writer.String("privilege"); |
| writer.String(event.privilege.c_str()); |
| writer.EndObject(); |
| } |
| writer.EndArray(); |
| writer.EndObject(); |
| writer.EndObject(); |
| Status status = parent_server_->AppendAuditEntry(buffer.GetString()); |
| if (!status.ok()) { |
| LOG(ERROR) << "Unable to record audit event record: " << status.GetDetail(); |
| if (FLAGS_abort_on_failed_audit_event) { |
| CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to " |
| "abort_on_failed_audit_event=true"); |
| } |
| } |
| return status; |
| } |
| |
| Status ClientRequestState::LogLineageRecord() { |
| const TExecRequest& request = exec_request(); |
| if (request.stmt_type == TStmtType::EXPLAIN || (!request.__isset.query_exec_request && |
| !request.__isset.catalog_op_request)) { |
| return Status::OK(); |
| } |
| TLineageGraph lineage_graph; |
| if (request.__isset.query_exec_request && |
| request.query_exec_request.__isset.lineage_graph) { |
| lineage_graph = request.query_exec_request.lineage_graph; |
| } else if (request.__isset.catalog_op_request && |
| request.catalog_op_request.__isset.lineage_graph) { |
| lineage_graph = request.catalog_op_request.lineage_graph; |
| } else { |
| return Status::OK(); |
| } |
| |
| if (catalog_op_executor_ != nullptr && catalog_op_type() == TCatalogOpType::DDL) { |
| const TDdlExecResponse* response = ddl_exec_response(); |
| //Set table location in the lineage graph. Currently, this is only set for external |
| // tables in frontend. |
| if (response->__isset.table_location) { |
| lineage_graph.__set_table_location(response->table_location); |
| } |
| // Update vertices that have -1 table_create_time for a newly created table/view. |
| if (response->__isset.table_name && |
| response->__isset.table_create_time) { |
| for (auto &vertex: lineage_graph.vertices) { |
| if (!vertex.__isset.metadata) continue; |
| if (vertex.metadata.table_name == response->table_name && |
| vertex.metadata.table_create_time == -1) { |
| vertex.metadata.__set_table_create_time(response->table_create_time); |
| } |
| } |
| } |
| } |
| |
| // Set the query end time in TLineageGraph. Must use UNIX time directly rather than |
| // e.g. converting from end_time() (IMPALA-4440). |
| lineage_graph.__set_ended(UnixMillis() / 1000); |
| |
| string lineage_record; |
| LineageUtil::TLineageToJSON(lineage_graph, &lineage_record); |
| |
| if (parent_server_->AreQueryHooksEnabled()) { |
| // invoke QueryEventHooks |
| TQueryCompleteContext query_complete_context; |
| query_complete_context.__set_lineage_string(lineage_record); |
| const Status& status = ExecEnv::GetInstance()->frontend()->CallQueryCompleteHooks( |
| query_complete_context); |
| |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to send query lineage info to FE CallQueryCompleteHooks" |
| << status.GetDetail(); |
| if (FLAGS_abort_on_failed_lineage_event) { |
| CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to " |
| "abort_on_failed_lineage_event=true"); |
| } |
| } |
| } |
| |
| // lineage logfile writing is deprecated in favor of the |
| // QueryEventHooks (see FE). this behavior is being retained |
| // for now but may be removed in the future. |
| if (parent_server_->IsLineageLoggingEnabled()) { |
| const Status& status = parent_server_->AppendLineageEntry(lineage_record); |
| if (!status.ok()) { |
| LOG(ERROR) << "Unable to record query lineage record: " << status.GetDetail(); |
| if (FLAGS_abort_on_failed_lineage_event) { |
| CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to " |
| "abort_on_failed_lineage_event=true"); |
| } |
| } |
| return status; |
| } |
| return Status::OK(); |
| } |
| |
| string ClientRequestState::ExecStateToString(ExecState state) { |
| static const unordered_map<ClientRequestState::ExecState, const char*> |
| exec_state_strings{{ClientRequestState::ExecState::INITIALIZED, "INITIALIZED"}, |
| {ClientRequestState::ExecState::PENDING, "PENDING"}, |
| {ClientRequestState::ExecState::RUNNING, "RUNNING"}, |
| {ClientRequestState::ExecState::FINISHED, "FINISHED"}, |
| {ClientRequestState::ExecState::ERROR, "ERROR"}}; |
| return exec_state_strings.at(state); |
| } |
| |
| string ClientRequestState::RetryStateToString(RetryState state) { |
| static const unordered_map<ClientRequestState::RetryState, const char*> |
| retry_state_strings{{ClientRequestState::RetryState::NOT_RETRIED, "NOT_RETRIED"}, |
| {ClientRequestState::RetryState::RETRYING, "RETRYING"}, |
| {ClientRequestState::RetryState::RETRIED, "RETRIED"}}; |
| return retry_state_strings.at(state); |
| } |
| |
| TCatalogServiceRequestHeader ClientRequestState::GetCatalogServiceRequestHeader() { |
| TCatalogServiceRequestHeader header = TCatalogServiceRequestHeader(); |
| header.__set_requesting_user(effective_user()); |
| header.__set_client_ip(session()->network_address.hostname); |
| header.__set_want_minimal_response(FLAGS_use_local_catalog); |
| header.__set_redacted_sql_stmt( |
| query_ctx_.client_request.__isset.redacted_stmt ? |
| query_ctx_.client_request.redacted_stmt : query_ctx_.client_request.stmt); |
| header.__set_query_id(query_ctx_.query_id); |
| header.__set_coordinator_hostname(FLAGS_hostname); |
| return header; |
| } |
| |
| void ClientRequestState::RegisterRPC() { |
| RpcEventHandler::InvocationContext* rpc_context = |
| RpcEventHandler::GetThreadRPCContext(); |
| // The existence of rpc_context means that this is called from an RPC |
| if (rpc_context) { |
| lock_guard<mutex> l(lock_); |
| if (track_rpcs_ && pending_rpcs_.find(rpc_context) == pending_rpcs_.end()) { |
| rpc_context->Register(); |
| pending_rpcs_.insert(rpc_context); |
| rpc_count_->Add(1); |
| } |
| } |
| } |
| |
| void ClientRequestState::UnRegisterCompletedRPCs() { |
| lock_guard<mutex> l(lock_); |
| for (auto iter = pending_rpcs_.begin(); iter != pending_rpcs_.end();) { |
| RpcEventHandler::InvocationContext* rpc_context = *iter; |
| uint64_t read_ns = 0, write_ns = 0; |
| if (rpc_context->UnRegisterCompleted(read_ns, write_ns)) { |
| rpc_read_timer_->Add(read_ns); |
| rpc_write_timer_->Add(write_ns); |
| iter = pending_rpcs_.erase(iter); |
| } else { |
| ++iter; |
| } |
| } |
| } |
| |
| void ClientRequestState::UnRegisterRemainingRPCs() { |
| lock_guard<mutex> l(lock_); |
| for (auto rpc_context: pending_rpcs_) { |
| rpc_context->UnRegister(); |
| } |
| track_rpcs_ = false; |
| pending_rpcs_.clear(); |
| } |
| |
| void ClientRequestState::CopyRPCs(ClientRequestState& from_request) { |
| lock_guard<mutex> l_to(lock_); |
| lock_guard<mutex> l_from(from_request.lock_); |
| rpc_read_timer_->Add(from_request.rpc_read_timer_->value()); |
| rpc_write_timer_->Add(from_request.rpc_write_timer_->value()); |
| rpc_count_->Add(from_request.rpc_count_->value()); |
| for (auto rpc_context: from_request.pending_rpcs_) { |
| rpc_context->Register(); |
| pending_rpcs_.insert(rpc_context); |
| } |
| } |
| |
| Status ClientRequestState::ExecMigrateRequest() { |
| ExecMigrateRequestImpl(); |
| SetResultSet({"Table has been migrated."}); |
| return query_status_; |
| } |
| |
| void ClientRequestState::ExecMigrateRequestImpl() { |
| // A convert table request holds the query strings for the sub-queries. These are |
| // populated by ConvertTableToIcebergStmt in the Frontend during analysis. |
| const TConvertTableRequest& params = exec_request().convert_table_request; |
| { |
| RuntimeProfile* child_profile = |
| RuntimeProfile::Create(&profile_pool_, "Child Queries 1"); |
| profile_->AddChild(child_profile); |
| vector<ChildQuery> child_queries; |
| |
| // Prepare: SET some table properties for the original table. |
| RuntimeProfile* set_hdfs_table_profile = RuntimeProfile::Create( |
| &profile_pool_, "Set properties for HDFS table query"); |
| child_profile->AddChild(set_hdfs_table_profile); |
| child_queries.emplace_back(params.set_hdfs_table_properties_query, this, |
| parent_server_, set_hdfs_table_profile, &profile_pool_); |
| |
| // Prepare: RENAME the HDFS table to a temporary HDFS table. |
| RuntimeProfile* rename_hdfs_table_profile = RuntimeProfile::Create( |
| &profile_pool_, "Rename HDFS table query"); |
| child_profile->AddChild(rename_hdfs_table_profile); |
| child_queries.emplace_back(params.rename_hdfs_table_to_temporary_query, |
| this, parent_server_, rename_hdfs_table_profile, &profile_pool_); |
| |
| // Prepare: REFRESH the temporary HDFS table. |
| RuntimeProfile* refresh_hdfs_table_profile = RuntimeProfile::Create( |
| &profile_pool_, "Refresh temporary HDFS table query"); |
| child_profile->AddChild(refresh_hdfs_table_profile); |
| child_queries.emplace_back(params.refresh_temporary_hdfs_table_query, this, |
| parent_server_, refresh_hdfs_table_profile, &profile_pool_); |
| |
| // Execute child queries |
| unique_ptr<ChildQueryExecutor> query_executor(new ChildQueryExecutor()); |
| RETURN_VOID_IF_ERROR(query_executor->ExecAsync(move(child_queries))); |
| vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>(); |
| Status query_status = query_executor->WaitForAll(completed_queries); |
| if (!query_status.ok()) AddTableResetHints(params, &query_status); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(query_status)); |
| } |
| } |
| // Create an external Iceberg table using the data of the HDFS table. |
| Status status = frontend_->Convert(exec_request()); |
| if (!status.ok()) AddTableResetHints(params, &status); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(status)); |
| } |
| { |
| RuntimeProfile* child_profile = |
| RuntimeProfile::Create(&profile_pool_, "Child Queries 2"); |
| profile_->AddChild(child_profile); |
| vector<ChildQuery> child_queries; |
| |
| if (params.__isset.create_iceberg_table_query) { |
| // Prepare: CREATE the Iceberg table that inherits HDFS table location. |
| RuntimeProfile* create_iceberg_table_profile = RuntimeProfile::Create( |
| &profile_pool_, "Create Iceberg table query"); |
| child_profile->AddChild(create_iceberg_table_profile); |
| child_queries.emplace_back(params.create_iceberg_table_query, this, |
| parent_server_, create_iceberg_table_profile, &profile_pool_); |
| } else { |
| // Prepare: Invalidate metadata for tables in Hive catalog to guarantee that it is |
| // propagated instantly to all coordinators. |
| RuntimeProfile* invalidate_metadata_profile = RuntimeProfile::Create( |
| &profile_pool_, "Invalidate metadata Iceberg table query"); |
| child_profile->AddChild(invalidate_metadata_profile); |
| child_queries.emplace_back(params.invalidate_metadata_query, this, |
| parent_server_, invalidate_metadata_profile, &profile_pool_); |
| } |
| |
| if (params.__isset.post_create_alter_table_query) { |
| // Prepare: ALTER TABLE query after creating the Iceberg table. |
| RuntimeProfile* post_create_alter_table_profile = RuntimeProfile::Create( |
| &profile_pool_, "ALTER TABLE after create Iceberg table query"); |
| child_profile->AddChild(post_create_alter_table_profile); |
| child_queries.emplace_back(params.post_create_alter_table_query, this, |
| parent_server_, post_create_alter_table_profile, &profile_pool_); |
| } |
| |
| // Prepare: DROP the temporary HDFS table. |
| RuntimeProfile* drop_tmp_hdfs_table_profile = RuntimeProfile::Create( |
| &profile_pool_, "Drop temporary HDFS table query"); |
| child_profile->AddChild(drop_tmp_hdfs_table_profile); |
| child_queries.emplace_back(params.drop_temporary_hdfs_table_query, this, |
| parent_server_, drop_tmp_hdfs_table_profile, &profile_pool_); |
| |
| // Execute queries |
| unique_ptr<ChildQueryExecutor> query_executor(new ChildQueryExecutor()); |
| RETURN_VOID_IF_ERROR(query_executor->ExecAsync(move(child_queries))); |
| vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>(); |
| Status query_status = query_executor->WaitForAll(completed_queries); |
| { |
| lock_guard<mutex> l(lock_); |
| RETURN_VOID_IF_ERROR(UpdateQueryStatus(query_status)); |
| } |
| } |
| } |
| |
| Status ClientRequestState::TryKillQueryLocally( |
| const TUniqueId& query_id, const string& requesting_user, bool is_admin) { |
| Status status = ExecEnv::GetInstance()->impala_server()->KillQuery( |
| query_id, requesting_user, is_admin); |
| if (status.ok()) { |
| SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))}); |
| return query_status_; |
| } |
| return status; |
| } |
| |
| Status ClientRequestState::TryKillQueryRemotely( |
| const TUniqueId& query_id, const KillQueryRequestPB& request) { |
| // The initial status should be INVALID_QUERY_HANDLE so that if there is no other |
| // coordinator in the cluster, it will be the status to return. |
| Status status = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id)); |
| ExecutorGroup all_coordinators = |
| ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->all_coordinators; |
| // Skipping the current impalad. |
| unique_ptr<ExecutorGroup> other_coordinators{ExecutorGroup::GetFilteredExecutorGroup( |
| &all_coordinators, {ExecEnv::GetInstance()->krpc_address()})}; |
| // If we get an RPC error, instead of returning immediately, we record it and move |
| // on to the next coordinator. |
| Status rpc_errors = Status::OK(); |
| for (const auto& backend : other_coordinators->GetAllExecutorDescriptors()) { |
| // The logic here is similar to ExecShutdownRequest() |
| NetworkAddressPB krpc_addr = MakeNetworkAddressPB(backend.ip_address(), |
| backend.address().port(), backend.backend_id(), |
| ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId()); |
| VLOG_QUERY << "Sending KillQuery() RPC to " << NetworkAddressPBToString(krpc_addr); |
| unique_ptr<ControlServiceProxy> proxy; |
| Status get_proxy_status = |
| ControlService::GetProxy(krpc_addr, backend.address().hostname(), &proxy); |
| if (!get_proxy_status.ok()) { |
| Status get_proxy_status_to_report{Substitute( |
| "KillQuery: Could not get Proxy to ControlService at $0 with error: $1.", |
| NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg())}; |
| rpc_errors.MergeStatus(get_proxy_status_to_report); |
| LOG(ERROR) << get_proxy_status_to_report.GetDetail(); |
| continue; |
| } |
| KillQueryResponsePB response; |
| const int num_retries = 3; |
| const int64_t timeout_ms = 10 * MILLIS_PER_SEC; |
| const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC; |
| // Currently, a KILL QUERY statement is not interruptible. |
| Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::KillQuery, |
| request, &response, query_ctx_, "KillQuery() RPC failed", num_retries, timeout_ms, |
| backoff_time_ms, "CRS_KILL_QUERY_RPC"); |
| if (!rpc_status.ok()) { |
| LOG(ERROR) << rpc_status.GetDetail(); |
| rpc_errors.MergeStatus(rpc_status); |
| continue; |
| } |
| // Currently, we only support killing one query in one KILL QUERY statement. |
| DCHECK_EQ(response.statuses_size(), 1); |
| status = Status(response.statuses(0)); |
| if (status.ok()) { |
| // Kill succeeded. |
| VLOG_QUERY << "KillQuery: Found the coordinator at " |
| << NetworkAddressPBToString(krpc_addr); |
| SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))}); |
| return query_status_; |
| } else if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) { |
| LOG(ERROR) << "KillQuery: Found the coordinator at " |
| << NetworkAddressPBToString(krpc_addr) |
| << " but failed to kill the query: " |
| << status.GetDetail(); |
| // Kill failed, but we found the coordinator of the query. |
| return status; |
| } |
| } |
| // We did't find the coordinator of the query after trying all other coordinators. |
| // If there is any RPC error, return it. |
| if (!rpc_errors.ok()) { |
| return rpc_errors; |
| } |
| // If there is no RPC error, return INVALID_QUERY_HANDLE. |
| return status; |
| } |
| |
| Status ClientRequestState::ExecKillQueryRequest() { |
| TUniqueId query_id = exec_request().kill_query_request.query_id; |
| string requesting_user = exec_request().kill_query_request.requesting_user; |
| bool is_admin = exec_request().kill_query_request.is_admin; |
| |
| VLOG_QUERY << "Exec KillQuery: query_id=" << PrintId(query_id) |
| << ", requesting_user=" << requesting_user << ", is_admin=" << is_admin; |
| |
| // First try cancelling the query locally. |
| Status status = TryKillQueryLocally(query_id, requesting_user, is_admin); |
| if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) { |
| return status; |
| } |
| |
| // The current impalad is NOT the coordinator of the query. Now we have to broadcast |
| // the kill request to all other coordinators. |
| UniqueIdPB query_id_pb; |
| TUniqueIdToUniqueIdPB(query_id, &query_id_pb); |
| KillQueryRequestPB request; |
| *request.add_query_ids() = query_id_pb; |
| *request.mutable_requesting_user() = requesting_user; |
| request.set_is_admin(is_admin); |
| status = TryKillQueryRemotely(query_id, request); |
| if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) { |
| return status; |
| } |
| // All the error messages are "Invalid or unknown query handle". |
| return Status("Could not find query on any coordinator."); |
| } |
| |
| void ClientRequestState::AddTableResetHints(const TConvertTableRequest& params, |
| Status* status) const { |
| string table_reset_hint("Your table might have been renamed. To reset the name " |
| "try running:\n" + params.reset_table_name_query + ";"); |
| status->MergeStatus(Status(table_reset_hint)); |
| } |
| |
| int64_t ClientRequestState::num_rows_fetched_counter() const { |
| if (LIKELY(num_rows_fetched_counter_ != nullptr)) { |
| return num_rows_fetched_counter_->value(); |
| } |
| |
| return 0; |
| } |
| |
| int64_t ClientRequestState::row_materialization_rate() const { |
| if (LIKELY(row_materialization_rate_ != nullptr)) { |
| return row_materialization_rate_->value(); |
| } |
| |
| return 0; |
| } |
| |
| int64_t ClientRequestState::row_materialization_timer() const { |
| if (LIKELY(row_materialization_timer_ != nullptr)) { |
| return row_materialization_timer_->value(); |
| } |
| |
| return 0; |
| } |
| |
| void ClientRequestState::AddCatalogTimeline() { |
| if (catalog_op_executor_ != nullptr |
| && catalog_op_executor_->catalog_profile() != nullptr) { |
| for (const TEventSequence& catalog_timeline : |
| catalog_op_executor_->catalog_profile()->event_sequences) { |
| summary_profile_->AddEventSequence(catalog_timeline.name, catalog_timeline); |
| } |
| } |
| } |
| |
| } |