| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/query-state.h" |
| |
| #include <mutex> |
| |
| #include "codegen/llvm-codegen.h" |
| #include "common/thread-debug-info.h" |
| #include "exec/kudu-util.h" |
| #include "exprs/expr.h" |
| #include "kudu/rpc/rpc_context.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/rpc/rpc_sidecar.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/status.h" |
| #include "rpc/rpc-mgr.h" |
| #include "runtime/backend-client.h" |
| #include "runtime/bufferpool/buffer-pool.h" |
| #include "runtime/bufferpool/reservation-tracker.h" |
| #include "runtime/bufferpool/reservation-util.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/fragment-state.h" |
| #include "runtime/fragment-instance-state.h" |
| #include "runtime/initial-reservations.h" |
| #include "runtime/mem-tracker.h" |
| #include "runtime/query-exec-mgr.h" |
| #include "runtime/runtime-filter-bank.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/scanner-mem-limiter.h" |
| #include "runtime/tmp-file-mgr.h" |
| #include "service/control-service.h" |
| #include "service/data-stream-service.h" |
| #include "util/container-util.h" |
| #include "util/debug-util.h" |
| #include "util/impalad-metrics.h" |
| #include "util/memory-metrics.h" |
| #include "util/metrics.h" |
| #include "util/system-state-info.h" |
| #include "util/thread.h" |
| #include "util/uid-util.h" |
| |
| #include "gen-cpp/control_service.pb.h" |
| #include "gen-cpp/control_service.proxy.h" |
| |
| using kudu::MonoDelta; |
| using kudu::rpc::RpcSidecar; |
| |
| #include "common/names.h" |
| |
| static const int DEFAULT_REPORT_WAIT_TIME_MS = 5000; |
| |
| DECLARE_int32(backend_client_rpc_timeout_ms); |
| DECLARE_int64(rpc_max_message_size); |
| |
| DEFINE_int32_hidden(stress_status_report_delay_ms, 0, "Stress option to inject a delay " |
| "before status reports. Has no effect on release builds."); |
| |
| namespace impala { |
| |
| PROFILE_DEFINE_DERIVED_COUNTER(GcCount, STABLE_LOW, TUnit::UNIT, |
| "Per-Impalad Counter: The number of GC collections that have occurred in the Impala " |
| "process over the duration of the query. Reported by JMX."); |
| PROFILE_DEFINE_DERIVED_COUNTER(GcTimeMillis, STABLE_LOW, TUnit::TIME_MS, |
| "Per-Impalad Counter: The amount of time spent in GC in the Impala process over the " |
| "duration of the query. Reported by JMX."); |
| PROFILE_DEFINE_DERIVED_COUNTER(GcNumWarnThresholdExceeded, STABLE_LOW, |
| TUnit::UNIT, |
| "Per-Impalad Counter: The number of JVM process pauses that occurred in this Impala " |
| "process over the duration of the query. Tracks the number of pauses at the WARN " |
| "threshold. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor."); |
| PROFILE_DEFINE_DERIVED_COUNTER(GcNumInfoThresholdExceeded, STABLE_LOW, |
| TUnit::UNIT, |
| "Per-Impalad Counter: The number of JVM process pauses that occurred in this Impala " |
| "process over the duration of the query. Tracks the number of pauses at the INFO " |
| "threshold. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor."); |
| PROFILE_DEFINE_DERIVED_COUNTER(GcTotalExtraSleepTimeMillis, STABLE_LOW, TUnit::TIME_MS, |
| "Per-Impalad Counter: The amount of time the JVM process paused over the duration " |
| "of the query. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor."); |
| |
| QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) { |
| DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr); |
| query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id); |
| } |
| |
| QueryState::ScopedRef::~ScopedRef() { |
| if (query_state_ == nullptr) return; |
| ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_); |
| } |
| |
| QueryState::QueryState( |
| const TQueryCtx& query_ctx, int64_t mem_limit, const string& request_pool) |
| : query_ctx_(query_ctx), |
| backend_resource_refcnt_(0), |
| refcnt_(0), |
| is_cancelled_(0), |
| query_spilled_(0), |
| host_profile_(RuntimeProfile::Create(obj_pool(), "<track resource usage>")) { |
| if (query_ctx_.request_pool.empty()) { |
| // fix up pool name for tests |
| DCHECK(!request_pool.empty()); |
| const_cast<TQueryCtx&>(query_ctx_).request_pool = request_pool; |
| } |
| TQueryOptions& query_options = |
| const_cast<TQueryOptions&>(query_ctx_.client_request.query_options); |
| // max_errors does not indicate how many errors in total have been recorded, but rather |
| // how many are distinct. It is defined as the sum of the number of generic errors and |
| // the number of distinct other errors. |
| if (query_options.max_errors <= 0) { |
| query_options.max_errors = 100; |
| } |
| if (query_options.batch_size <= 0) { |
| query_options.__set_batch_size(DEFAULT_BATCH_SIZE); |
| } |
| query_mem_tracker_ = MemTracker::CreateQueryMemTracker( |
| query_id(), mem_limit, query_ctx_.request_pool, &obj_pool_); |
| } |
| |
| void QueryState::ReleaseBackendResources() { |
| DCHECK(!released_backend_resources_); |
| // Clean up temporary files. |
| if (file_group_ != nullptr) file_group_->Close(); |
| if (filter_bank_ != nullptr) filter_bank_->Close(); |
| // Release any remaining reservation. |
| if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources(); |
| if (buffer_reservation_ != nullptr) buffer_reservation_->Close(); |
| if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources(); |
| // Release any memory associated with codegen. |
| for (auto& elem : fragment_state_map_) { |
| elem.second->ReleaseResources(); |
| } |
| // Mark the query as finished on the query MemTracker so that admission control will |
| // not consider the whole query memory limit to be "reserved". |
| query_mem_tracker_->set_query_exec_finished(); |
| // At this point query execution should not be consuming any resources but some tracked |
| // memory may still be used by the ClientRequestState for result caching. The query |
| // MemTracker will be closed later when this QueryState is torn down. |
| released_backend_resources_ = true; |
| } |
| |
| QueryState::~QueryState() { |
| DCHECK_EQ(refcnt_.Load(), 0); |
| DCHECK_EQ(backend_resource_refcnt_.Load(), 0); |
| if (query_mem_tracker_ != nullptr) { |
| // Disconnect the query MemTracker hierarchy from the global hierarchy. After this |
| // point nothing must touch this query's MemTracker and all tracked memory associated |
| // with the query must be released. The whole query subtree of MemTrackers can |
| // therefore be safely destroyed. |
| query_mem_tracker_->CloseAndUnregisterFromParent(); |
| } |
| /// We started periodic counters that track the system resource usage in Init(). |
| host_profile_->StopPeriodicCounters(); |
| } |
| |
| Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params, |
| const TExecPlanFragmentInfo& fragment_info) { |
| std::lock_guard<std::mutex> l(init_lock_); |
| // Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of |
| // Init() on failure. We need to do this before any returns because Init() always |
| // returns a resource refcount to its caller. |
| AcquireBackendResourceRefcount(); |
| |
| if (IsCancelled()) return Status::CANCELLED; |
| |
| RETURN_IF_ERROR(DebugAction(query_options(), "QUERY_STATE_INIT")); |
| |
| ExecEnv* exec_env = ExecEnv::GetInstance(); |
| |
| RuntimeProfile* jvm_host_profile = RuntimeProfile::Create(&obj_pool_, "JVM"); |
| host_profile_->AddChild(jvm_host_profile); |
| |
| int64_t gc_count = JvmMemoryCounterMetric::GC_COUNT->GetValue(); |
| PROFILE_GcCount.Instantiate(jvm_host_profile, |
| [gc_count]() { |
| return JvmMemoryCounterMetric::GC_COUNT->GetValue() - gc_count; |
| }); |
| |
| int64_t gc_time_millis = JvmMemoryCounterMetric::GC_TIME_MILLIS->GetValue(); |
| PROFILE_GcTimeMillis.Instantiate(jvm_host_profile, |
| [gc_time_millis]() { |
| return JvmMemoryCounterMetric::GC_TIME_MILLIS->GetValue() - gc_time_millis; |
| }); |
| |
| int64_t gc_num_warn_threshold_exceeded = |
| JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED->GetValue(); |
| PROFILE_GcNumWarnThresholdExceeded.Instantiate(jvm_host_profile, |
| [gc_num_warn_threshold_exceeded]() { |
| return JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED->GetValue() |
| - gc_num_warn_threshold_exceeded; |
| }); |
| |
| int64_t gc_num_info_threshold_exceeded = |
| JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED->GetValue(); |
| PROFILE_GcNumInfoThresholdExceeded.Instantiate(jvm_host_profile, |
| [gc_num_info_threshold_exceeded]() { |
| return JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED->GetValue() |
| - gc_num_info_threshold_exceeded; |
| }); |
| |
| int64_t gc_total_extra_sleep_time_millis = |
| JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS->GetValue(); |
| PROFILE_GcTotalExtraSleepTimeMillis.Instantiate(jvm_host_profile, |
| [gc_total_extra_sleep_time_millis]() { |
| return JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS->GetValue() |
| - gc_total_extra_sleep_time_millis; |
| }); |
| |
| // Initialize resource tracking counters. |
| if (query_ctx().trace_resource_usage) { |
| SystemStateInfo* system_state_info = exec_env->system_state_info(); |
| host_profile_->AddChunkedTimeSeriesCounter( |
| "HostCpuUserPercentage", TUnit::BASIS_POINTS, [system_state_info] () { |
| return system_state_info->GetCpuUsageRatios().user; |
| }); |
| host_profile_->AddChunkedTimeSeriesCounter( |
| "HostCpuSysPercentage", TUnit::BASIS_POINTS, [system_state_info] () { |
| return system_state_info->GetCpuUsageRatios().system; |
| }); |
| host_profile_->AddChunkedTimeSeriesCounter( |
| "HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () { |
| return system_state_info->GetCpuUsageRatios().iowait; |
| }); |
| // Add network usage |
| host_profile_->AddChunkedTimeSeriesCounter( |
| "HostNetworkRx", TUnit::BYTES_PER_SECOND, [system_state_info] () { |
| return system_state_info->GetNetworkUsage().rx_rate; |
| }); |
| host_profile_->AddChunkedTimeSeriesCounter( |
| "HostNetworkTx", TUnit::BYTES_PER_SECOND, [system_state_info] () { |
| return system_state_info->GetNetworkUsage().tx_rate; |
| }); |
| // Add disk stats |
| host_profile_->AddChunkedTimeSeriesCounter( |
| "HostDiskReadThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () { |
| return system_state_info->GetDiskStats().read_rate; |
| }); |
| host_profile_->AddChunkedTimeSeriesCounter( |
| "HostDiskWriteThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () { |
| return system_state_info->GetDiskStats().write_rate; |
| }); |
| } |
| |
| // Starting a new query creates threads and consumes a non-trivial amount of memory. |
| // If we are already starved for memory, fail as early as possible to avoid consuming |
| // more resources. |
| MemTracker* process_mem_tracker = exec_env->process_mem_tracker(); |
| if (process_mem_tracker->LimitExceeded(MemLimit::HARD)) { |
| string msg = Substitute( |
| "Query $0 could not start because the backend Impala daemon " |
| "is over its memory limit", PrintId(query_id())); |
| RETURN_IF_ERROR(process_mem_tracker->MemLimitExceeded(NULL, msg, 0)); |
| } |
| |
| RETURN_IF_ERROR(InitBufferPoolState()); |
| |
| // Initialize the RPC proxy once and report any error. |
| RETURN_IF_ERROR(ControlService::GetProxy(query_ctx().coord_krpc_address, |
| query_ctx().coord_address.hostname, &proxy_)); |
| |
| // don't copy query_ctx, it's large and we already did that in the c'tor |
| exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx()); |
| exec_rpc_params_.mutable_fragment_ctxs()->Swap( |
| const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentCtxPB>*>( |
| &exec_rpc_params->fragment_ctxs())); |
| exec_rpc_params_.mutable_fragment_instance_ctxs()->Swap( |
| const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentInstanceCtxPB>*>( |
| &exec_rpc_params->fragment_instance_ctxs())); |
| TExecPlanFragmentInfo& non_const_fragment_info = |
| const_cast<TExecPlanFragmentInfo&>(fragment_info); |
| fragment_info_.fragments.swap(non_const_fragment_info.fragments); |
| fragment_info_.__isset.fragments = true; |
| fragment_info_.fragment_instance_ctxs.swap( |
| non_const_fragment_info.fragment_instance_ctxs); |
| fragment_info_.__isset.fragment_instance_ctxs = true; |
| |
| // Claim the query-wide minimum reservation. Do this last so that we don't need |
| // to handle releasing it if a later step fails. |
| initial_reservations_ = |
| obj_pool_.Add(new InitialReservations(&obj_pool_, buffer_reservation_, |
| query_mem_tracker_, exec_rpc_params->initial_mem_reservation_total_claims())); |
| RETURN_IF_ERROR(initial_reservations_->Init( |
| query_id(), exec_rpc_params->min_mem_reservation_bytes())); |
| RETURN_IF_ERROR(InitFilterBank()); |
| scanner_mem_limiter_ = obj_pool_.Add(new ScannerMemLimiter); |
| |
| // Set barriers only for successful initialization. Otherwise the barriers |
| // never be notified. |
| instances_prepared_barrier_.reset( |
| new CountingBarrier(fragment_info_.fragment_instance_ctxs.size())); |
| instances_finished_barrier_.reset( |
| new CountingBarrier(fragment_info_.fragment_instance_ctxs.size())); |
| is_initialized_ = true; |
| return Status::OK(); |
| } |
| |
| UniqueIdPB QueryState::GetCoordinatorBackendId() const { |
| UniqueIdPB backend_id_pb; |
| TUniqueIdToUniqueIdPB(query_ctx_.coord_backend_id, &backend_id_pb); |
| return backend_id_pb; |
| } |
| |
| int64_t QueryState::GetMaxReservation() { |
| int64_t mem_limit = query_mem_tracker_->GetLowestLimit(MemLimit::HARD); |
| int64_t max_reservation; |
| if (query_options().__isset.buffer_pool_limit |
| && query_options().buffer_pool_limit > 0) { |
| max_reservation = query_options().buffer_pool_limit; |
| } else if (mem_limit == -1) { |
| // No query mem limit. The process-wide reservation limit is the only limit on |
| // reservations. |
| max_reservation = numeric_limits<int64_t>::max(); |
| } else { |
| DCHECK_GE(mem_limit, 0); |
| max_reservation = ReservationUtil::GetReservationLimitFromMemLimit(mem_limit); |
| } |
| return max_reservation; |
| } |
| |
| Status QueryState::InitBufferPoolState() { |
| ExecEnv* exec_env = ExecEnv::GetInstance(); |
| int64_t max_reservation = GetMaxReservation(); |
| VLOG(2) << "Buffer pool limit for " << PrintId(query_id()) << ": " << max_reservation; |
| |
| buffer_reservation_ = obj_pool_.Add(new ReservationTracker); |
| buffer_reservation_->InitChildTracker( |
| NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation); |
| |
| if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) { |
| file_group_ = obj_pool_.Add( |
| new TmpFileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(), |
| host_profile_, query_id(), query_options().scratch_limit)); |
| } |
| return Status::OK(); |
| } |
| |
| // Verifies the filters produced by all instances on the same backend are the same. |
| bool VerifyFiltersProduced(const vector<TPlanFragmentInstanceCtx>& instance_ctxs) { |
| int fragment_idx = -1; |
| std::unordered_set<int> first_set; |
| for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) { |
| bool first_instance_of_fragment = |
| fragment_idx == -1 || fragment_idx != instance_ctx.fragment_idx; |
| if (first_instance_of_fragment) { |
| fragment_idx = instance_ctx.fragment_idx; |
| first_set.clear(); |
| for (auto f : instance_ctx.filters_produced) first_set.insert(f.filter_id); |
| } |
| if (first_set.size() != instance_ctx.filters_produced.size()) return false; |
| for (auto f : instance_ctx.filters_produced) { |
| if (first_set.find(f.filter_id) == first_set.end()) return false; |
| } |
| } |
| return true; |
| } |
| |
| Status QueryState::InitFilterBank() { |
| int64_t runtime_filters_reservation_bytes = 0; |
| int fragment_ctx_idx = -1; |
| const vector<TPlanFragment>& fragments = fragment_info_.fragments; |
| const vector<TPlanFragmentInstanceCtx>& instance_ctxs = |
| fragment_info_.fragment_instance_ctxs; |
| // Add entries for all produced and consumed filters. |
| unordered_map<int32_t, FilterRegistration> filters; |
| for (const TPlanFragment& fragment : fragments) { |
| for (const TPlanNode& plan_node : fragment.plan.nodes) { |
| if (!plan_node.__isset.runtime_filters) continue; |
| for (const TRuntimeFilterDesc& filter : plan_node.runtime_filters) { |
| // Add filter if not already present. |
| auto it = filters.emplace(filter.filter_id, FilterRegistration(filter)).first; |
| // Currently hash joins are the only filter sources. Otherwise it must be a filter |
| // consumer. 'num_producers' is computed later, so don't update that here. |
| if (!plan_node.__isset.join_node) it->second.has_consumer = true; |
| } |
| } |
| if (fragment.output_sink.__isset.join_build_sink) { |
| const TJoinBuildSink& join_sink = fragment.output_sink.join_build_sink; |
| for (const TRuntimeFilterDesc& filter : join_sink.runtime_filters) { |
| // Add filter if not already present. |
| filters.emplace(filter.filter_id, FilterRegistration(filter)); |
| } |
| } |
| } |
| DCHECK(VerifyFiltersProduced(instance_ctxs)) |
| << "Filters produced by all instances on the same backend should be the same"; |
| for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) { |
| bool first_instance_of_fragment = fragment_ctx_idx == -1 |
| || fragments[fragment_ctx_idx].idx != instance_ctx.fragment_idx; |
| if (first_instance_of_fragment) { |
| ++fragment_ctx_idx; |
| DCHECK_EQ(fragments[fragment_ctx_idx].idx, instance_ctx.fragment_idx); |
| } |
| // TODO: this over-reserves memory a bit in a couple of cases: |
| // * if different fragments on this backend consume or produce the same filter. |
| // * if a finstance was chosen not to produce a global broadcast filter. |
| const TPlanFragment& fragment = fragments[fragment_ctx_idx]; |
| runtime_filters_reservation_bytes += |
| fragment.produced_runtime_filters_reservation_bytes; |
| if (first_instance_of_fragment) { |
| // Consumed filters are shared between all instances. |
| runtime_filters_reservation_bytes += |
| fragment.consumed_runtime_filters_reservation_bytes; |
| } |
| for (const TRuntimeFilterSource& produced_filter : instance_ctx.filters_produced) { |
| auto it = filters.find(produced_filter.filter_id); |
| DCHECK(it != filters.end()); |
| ++it->second.num_producers; |
| } |
| } |
| filter_bank_.reset( |
| new RuntimeFilterBank(this, filters, runtime_filters_reservation_bytes)); |
| return filter_bank_->ClaimBufferReservation(); |
| } |
| |
| const char* QueryState::BackendExecStateToString(const BackendExecState& state) { |
| static const unordered_map<BackendExecState, const char*> exec_state_to_str{ |
| {BackendExecState::PREPARING, "PREPARING"}, |
| {BackendExecState::EXECUTING, "EXECUTING"}, |
| {BackendExecState::FINISHED, "FINISHED"}, |
| {BackendExecState::CANCELLED, "CANCELLED"}, |
| {BackendExecState::ERROR, "ERROR"}}; |
| |
| return exec_state_to_str.at(state); |
| } |
| |
| void QueryState::UpdateBackendExecState() { |
| DFAKE_SCOPED_LOCK(backend_exec_state_lock_); |
| { |
| BackendExecState cur_state = backend_exec_state_; |
| unique_lock<SpinLock> l(status_lock_); |
| // We shouldn't call this function if we're already in a terminal state. |
| DCHECK(cur_state == BackendExecState::PREPARING || |
| cur_state == BackendExecState::EXECUTING) |
| << " Current State: " << BackendExecStateToString(cur_state) |
| << " | Current Status: " << overall_status_.GetDetail(); |
| if (overall_status_.IsCancelled()) { |
| // Received cancellation - go to CANCELLED state. |
| backend_exec_state_ = BackendExecState::CANCELLED; |
| } else if (!overall_status_.ok()) { |
| // Error while executing - go to ERROR state. |
| backend_exec_state_ = BackendExecState::ERROR; |
| } else { |
| // Transition to the next state in the lifecycle. |
| backend_exec_state_ = cur_state == BackendExecState::PREPARING ? |
| BackendExecState::EXECUTING : BackendExecState::FINISHED; |
| } |
| } |
| // Send one last report if the query has reached the terminal state |
| // and the coordinator is active. |
| if (IsTerminalState()) { |
| VLOG_QUERY << "UpdateBackendExecState(): last report for " << PrintId(query_id()); |
| while (is_coord_active_.Load() && !ReportExecStatus()) { |
| SleepForMs(GetReportWaitTimeMs()); |
| } |
| } |
| } |
| |
| Status QueryState::GetFInstanceState( |
| const TUniqueId& instance_id, FragmentInstanceState** fi_state) { |
| VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id); |
| RETURN_IF_ERROR(WaitForPrepare()); |
| auto it = fis_map_.find(instance_id); |
| *fi_state = it != fis_map_.end() ? it->second : nullptr; |
| return Status::OK(); |
| } |
| |
| int64_t QueryState::AsyncCodegenThreadHelper(const std::string& suffix) const { |
| int64_t res = 0; |
| vector<RuntimeProfile::Counter*> counters; |
| host_profile_->GetCounters( |
| LlvmCodeGen::ASYNC_CODEGEN_THREAD_COUNTERS_PREFIX + suffix, &counters); |
| |
| for (const RuntimeProfile::Counter* counter : counters) { |
| DCHECK(counter != nullptr); |
| res += counter->value(); |
| } |
| |
| return res; |
| } |
| |
| int64_t QueryState::AsyncCodegenThreadUserTime() const { |
| return AsyncCodegenThreadHelper("UserTime"); |
| } |
| |
| int64_t QueryState::AsyncCodegenThreadSysTime() const { |
| return AsyncCodegenThreadHelper("SysTime"); |
| } |
| |
| void QueryState::ConstructReport(bool instances_started, |
| ReportExecStatusRequestPB* report, TRuntimeProfileForest* profiles_forest) { |
| report->Clear(); |
| TUniqueIdToUniqueIdPB(query_id(), report->mutable_query_id()); |
| DCHECK(exec_rpc_params_.has_coord_state_idx()); |
| report->set_coord_state_idx(exec_rpc_params_.coord_state_idx()); |
| { |
| unique_lock<SpinLock> l(status_lock_); |
| |
| Status debug_action_status = |
| DebugAction(query_options(), "CONSTRUCT_QUERY_STATE_REPORT"); |
| if (UNLIKELY(!debug_action_status.ok())) overall_status_ = debug_action_status; |
| |
| overall_status_.ToProto(report->mutable_overall_status()); |
| if (IsValidFInstanceId(failed_finstance_id_)) { |
| TUniqueIdToUniqueIdPB(failed_finstance_id_, report->mutable_fragment_instance_id()); |
| } |
| } |
| |
| // Add profile to report |
| host_profile_->ToThrift(&profiles_forest->host_profile); |
| profiles_forest->__isset.host_profile = true; |
| |
| // Free resources in chunked counters in the profile |
| host_profile_->ClearChunkedTimeSeriesCounters(); |
| |
| if (instances_started) { |
| // Stats that we aggregate across the instances. |
| int64_t cpu_user_ns = AsyncCodegenThreadUserTime(); |
| int64_t cpu_sys_ns = AsyncCodegenThreadSysTime(); |
| int64_t bytes_read = 0; |
| int64_t scan_ranges_complete = 0; |
| int64_t exchange_bytes_sent = 0; |
| int64_t scan_bytes_sent = 0; |
| |
| for (const auto& entry : fis_map_) { |
| FragmentInstanceState* fis = entry.second; |
| |
| // If this fragment instance has already sent its last report, skip it. |
| if (fis->final_report_sent()) { |
| DCHECK(fis->IsDone()); |
| } else { |
| // Update the status and profiles of this fragment instance. |
| FragmentInstanceExecStatusPB* instance_status = |
| report->add_instance_exec_status(); |
| profiles_forest->profile_trees.emplace_back(); |
| fis->GetStatusReport(instance_status, &profiles_forest->profile_trees.back()); |
| } |
| |
| // Include these values for running and completed finstances in the status report. |
| cpu_user_ns += fis->cpu_user_ns(); |
| cpu_sys_ns += fis->cpu_sys_ns(); |
| bytes_read += fis->bytes_read(); |
| scan_ranges_complete += fis->scan_ranges_complete(); |
| // Determine whether this instance had a scan node in its plan. |
| // Note: this is hacky. E.g. it doesn't work for Kudu scans. |
| if (fis->bytes_read() > 0) { |
| scan_bytes_sent += fis->total_bytes_sent(); |
| } else { |
| exchange_bytes_sent += fis->total_bytes_sent(); |
| } |
| } |
| report->set_peak_mem_consumption(query_mem_tracker_->peak_consumption()); |
| report->set_cpu_user_ns(cpu_user_ns); |
| report->set_cpu_sys_ns(cpu_sys_ns); |
| report->set_bytes_read(bytes_read); |
| report->set_scan_ranges_complete(scan_ranges_complete); |
| report->set_exchange_bytes_sent(exchange_bytes_sent); |
| report->set_scan_bytes_sent(scan_bytes_sent); |
| } |
| } |
| |
| bool QueryState::ReportExecStatus() { |
| #ifndef NDEBUG |
| if (FLAGS_stress_status_report_delay_ms) { |
| LOG(INFO) << "Sleeping " << FLAGS_stress_status_report_delay_ms << "ms before " |
| << "reporting for query " << PrintId(query_id()); |
| SleepForMs(FLAGS_stress_status_report_delay_ms); |
| } |
| #endif |
| bool instances_started = fis_map_.size() > 0; |
| |
| // This will send a report even if we are cancelled. If the query completed correctly |
| // but fragments still need to be cancelled (e.g. limit reached), the coordinator will |
| // be waiting for a final report and profile. |
| ReportExecStatusRequestPB report; |
| |
| // Gather the statuses and profiles of the fragment instances. |
| TRuntimeProfileForest profiles_forest; |
| ConstructReport(instances_started, &report, &profiles_forest); |
| |
| // Serialize the runtime profile with Thrift to 'profile_buf'. Note that the |
| // serialization output is owned by 'serializer' so this must be alive until RPC |
| // is done. |
| ThriftSerializer serializer(true); |
| uint8_t* profile_buf = nullptr; |
| uint32_t profile_len = 0; |
| Status serialize_status = |
| serializer.SerializeToBuffer(&profiles_forest, &profile_len, &profile_buf); |
| if (UNLIKELY(!serialize_status.ok() || |
| profile_len > FLAGS_rpc_max_message_size || |
| !DebugAction(query_options(), "REPORT_EXEC_STATUS_PROFILE").ok())) { |
| profile_buf = nullptr; |
| LOG(ERROR) << Substitute("Failed to create $0profile for query $1: " |
| "status=$2 len=$3", IsTerminalState() ? "final " : "", PrintId(query_id()), |
| serialize_status.ok() ? "OK" : serialize_status.GetDetail(), profile_len); |
| } |
| |
| Status rpc_status; |
| Status result_status; |
| RpcController rpc_controller; |
| |
| // The profile is a thrift structure serialized to a string and sent as a sidecar. |
| // We keep the runtime profile as Thrift object as Impala client still communicates |
| // with Impala server with Thrift RPC. |
| // |
| // Note that the sidecar is created with faststring so the ownership of the Thrift |
| // profile buffer is transferred to RPC layer and it is freed after the RPC payload |
| // is sent. If serialization of the profile to RPC sidecar fails, we will proceed |
| // without the profile so that the coordinator can still get the status and won't |
| // conclude that the backend has hung and cancel the query. |
| if (profile_buf != nullptr) { |
| unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>(); |
| sidecar_buf->assign_copy(profile_buf, profile_len); |
| unique_ptr<RpcSidecar> sidecar = RpcSidecar::FromFaststring(move(sidecar_buf)); |
| |
| int sidecar_idx; |
| kudu::Status sidecar_status = |
| rpc_controller.AddOutboundSidecar(move(sidecar), &sidecar_idx); |
| if (LIKELY(sidecar_status.ok())) { |
| report.set_thrift_profiles_sidecar_idx(sidecar_idx); |
| } else { |
| LOG(DFATAL) << FromKuduStatus(sidecar_status, "Failed to add sidecar").GetDetail(); |
| } |
| } |
| |
| // TODO: --backend_client_rpc_timeout_ms was originally intended as a socket timeout for |
| // Thrift. We should rethink how we set backend rpc timeouts for krpc. |
| rpc_controller.set_timeout( |
| MonoDelta::FromMilliseconds(FLAGS_backend_client_rpc_timeout_ms)); |
| ReportExecStatusResponsePB resp; |
| rpc_status = FromKuduStatus(proxy_->ReportExecStatus(report, &resp, &rpc_controller), |
| "ReportExecStatus() RPC failed"); |
| result_status = Status(resp.status()); |
| int64_t retry_time_ms = 0; |
| if (rpc_status.ok()) { |
| num_failed_reports_ = 0; |
| failed_report_time_ms_ = 0; |
| } else { |
| ++num_failed_reports_; |
| if (failed_report_time_ms_ == 0) failed_report_time_ms_ = MonotonicMillis(); |
| retry_time_ms = MonotonicMillis() - failed_report_time_ms_; |
| LOG(WARNING) << Substitute("Failed to send ReportExecStatus() RPC for query $0. " |
| "Consecutive failed reports = $1. Time spent retrying = $2ms.", |
| PrintId(query_id()), num_failed_reports_, retry_time_ms); |
| } |
| |
| // Notify the fragment instances of the report's status. |
| for (const FragmentInstanceExecStatusPB& instance_exec_status : |
| report.instance_exec_status()) { |
| const TUniqueId& id = ProtoToQueryId(instance_exec_status.fragment_instance_id()); |
| FragmentInstanceState* fis = fis_map_[id]; |
| if (rpc_status.ok()) { |
| fis->ReportSuccessful(instance_exec_status); |
| } else { |
| fis->ReportFailed(instance_exec_status); |
| } |
| } |
| |
| if (((!rpc_status.ok() && retry_time_ms >= query_ctx().status_report_max_retry_s * 1000) |
| || !result_status.ok()) |
| && instances_started) { |
| // TODO: should we try to keep rpc_status for the final report? (but the final |
| // report, following this Cancel(), may not succeed anyway.) |
| // TODO: not keeping an error status here means that all instances might |
| // abort with CANCELLED status, despite there being an error |
| if (!rpc_status.ok()) { |
| LOG(ERROR) << "Cancelling fragment instances due to failure to reach the " |
| << "coordinator. (" << rpc_status.GetDetail() << ")."; |
| is_coord_active_.Store(false); |
| } else if (!result_status.ok()) { |
| // If the ReportExecStatus RPC succeeded in reaching the coordinator and we get |
| // back a non-OK status, it means that the coordinator expects us to cancel the |
| // fragment instances for this query. |
| LOG(INFO) << "Cancelling fragment instances as directed by the coordinator. " |
| << "Returned status: " << result_status.GetDetail(); |
| } |
| Cancel(); |
| return true; |
| } |
| |
| return rpc_status.ok(); |
| } |
| |
| int64_t QueryState::GetReportWaitTimeMs() const { |
| int64_t report_interval = query_ctx().status_report_interval_ms > 0 ? |
| query_ctx().status_report_interval_ms : |
| DEFAULT_REPORT_WAIT_TIME_MS; |
| if (num_failed_reports_ == 0) { |
| return report_interval; |
| } else { |
| // Generate a random number between 0 and 1 - we'll retry sometime evenly distributed |
| // between 'report_interval' and 'report_interval * (num_failed_reports_ + 1)', so we |
| // won't hit the "thundering herd" problem. |
| float jitter = static_cast<float>(rand()) / static_cast<float>(RAND_MAX); |
| return report_interval * (num_failed_reports_ * jitter + 1); |
| } |
| } |
| |
| void QueryState::ErrorDuringFragmentCodegen(const Status& status) { |
| unique_lock<SpinLock> l(status_lock_); |
| if (!HasErrorStatus()) { |
| overall_status_ = status; |
| failed_finstance_id_ = TUniqueId(); |
| } |
| } |
| |
| void QueryState::ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id) { |
| { |
| unique_lock<SpinLock> l(status_lock_); |
| if (!HasErrorStatus()) { |
| overall_status_ = status; |
| failed_finstance_id_ = finst_id; |
| } |
| } |
| discard_result(instances_prepared_barrier_->Notify()); |
| } |
| |
| void QueryState::ErrorDuringExecute(const Status& status, const TUniqueId& finst_id) { |
| { |
| unique_lock<SpinLock> l(status_lock_); |
| if (!HasErrorStatus()) { |
| overall_status_ = status; |
| failed_finstance_id_ = finst_id; |
| } |
| } |
| instances_finished_barrier_->NotifyRemaining(); |
| } |
| |
| Status QueryState::WaitForPrepare() { |
| instances_prepared_barrier_->Wait(); |
| unique_lock<SpinLock> l(status_lock_); |
| return overall_status_; |
| } |
| |
| void QueryState::WaitForFinish() { |
| instances_finished_barrier_->Wait(); |
| } |
| |
| bool QueryState::WaitForFinishOrTimeout(int32_t timeout_ms) { |
| bool timed_out = false; |
| instances_finished_barrier_->Wait(timeout_ms, &timed_out); |
| return !timed_out; |
| } |
| |
| bool QueryState::StartFInstances() { |
| VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id()) |
| << " #instances=" << fragment_info_.fragment_instance_ctxs.size(); |
| DCHECK_GT(refcnt_.Load(), 0); |
| DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()"; |
| |
| DCHECK_GT(fragment_info_.fragments.size(), 0); |
| vector<unique_ptr<Thread>> codegen_threads; |
| int num_unstarted_instances = fragment_info_.fragment_instance_ctxs.size(); |
| |
| // set up desc tbl |
| DCHECK(query_ctx().__isset.desc_tbl_serialized); |
| Status start_finstances_status = |
| DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl_serialized, &desc_tbl_); |
| if (UNLIKELY(!start_finstances_status.ok())) goto error; |
| VLOG(2) << "descriptor table for query=" << PrintId(query_id()) |
| << "\n" << desc_tbl_->DebugString(); |
| |
| start_finstances_status = FragmentState::CreateFragmentStateMap( |
| fragment_info_, exec_rpc_params_, this, fragment_state_map_); |
| if (UNLIKELY(!start_finstances_status.ok())) goto error; |
| |
| fragment_events_start_time_ = MonotonicStopWatch::Now(); |
| for (auto& fragment : fragment_state_map_) { |
| FragmentState* fragment_state = fragment.second; |
| for (int i = 0; i < fragment_state->instance_ctxs().size(); ++i) { |
| const TPlanFragmentInstanceCtx* instance_ctx = fragment_state->instance_ctxs()[i]; |
| const PlanFragmentInstanceCtxPB* instance_ctx_pb = |
| fragment_state->instance_ctx_pbs()[i]; |
| DCHECK_EQ(instance_ctx->fragment_idx, instance_ctx_pb->fragment_idx()); |
| FragmentInstanceState* fis = obj_pool_.Add(new FragmentInstanceState( |
| this, fragment_state, *instance_ctx, *instance_ctx_pb)); |
| |
| // start new thread to execute instance |
| refcnt_.Add(1); // decremented in ExecFInstance() |
| AcquireBackendResourceRefcount(); // decremented in ExecFInstance() |
| |
| // Add the fragment instance ID to the 'fis_map_'. Has to happen before the thread |
| // is spawned or we may race with users of 'fis_map_'. |
| fis_map_.emplace(fis->instance_id(), fis); |
| |
| string thread_name = |
| Substitute("$0 (finst:$1)", FragmentInstanceState::FINST_THREAD_NAME_PREFIX, |
| PrintId(instance_ctx->fragment_instance_id)); |
| unique_ptr<Thread> t; |
| |
| // Inject thread creation failures through debug actions if enabled. |
| Status debug_action_status = |
| DebugAction(query_options(), "FIS_FAIL_THREAD_CREATION"); |
| start_finstances_status = !debug_action_status.ok() ? |
| debug_action_status : |
| Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name, |
| [this, fis]() { this->ExecFInstance(fis); }, &t, true); |
| if (!start_finstances_status.ok()) { |
| fis_map_.erase(fis->instance_id()); |
| // Undo refcnt increments done immediately prior to Thread::Create(). The |
| // reference counts were both greater than zero before the increments, so |
| // neither of these decrements will free any structures. |
| ReleaseBackendResourceRefcount(); |
| ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this); |
| goto error; |
| } |
| t->Detach(); |
| --num_unstarted_instances; |
| } |
| } |
| return true; |
| |
| error: |
| // This point is reached if there were general errors to start query fragment instances. |
| // Wait for all running fragment instances to finish preparing and report status to the |
| // coordinator to start query cancellation. |
| { |
| // Prioritize general errors as a query killing error, even over an error |
| // during Prepare() for a FIS. Overwrite any existing value in 'overall_status_'. |
| std::unique_lock<SpinLock> l(status_lock_); |
| overall_status_ = start_finstances_status; |
| failed_finstance_id_ = TUniqueId(); |
| } |
| // Updates the barrier for all unstarted fragment instances. |
| for (int i = 0; i < num_unstarted_instances; ++i) { |
| DonePreparing(); |
| } |
| // Block until all the already started fragment instances finish Prepare()-ing before |
| // reporting the error. |
| discard_result(WaitForPrepare()); |
| UpdateBackendExecState(); |
| DCHECK(IsTerminalState()); |
| return false; |
| } |
| |
| void QueryState::MonitorFInstances() { |
| // Wait for all fragment instances to finish preparing. |
| discard_result(WaitForPrepare()); |
| UpdateBackendExecState(); |
| if (IsTerminalState()) goto done; |
| |
| // Once all fragment instances finished preparing successfully, start periodic |
| // reporting back to the coordinator. |
| DCHECK(backend_exec_state_ == BackendExecState::EXECUTING) |
| << BackendExecStateToString(backend_exec_state_); |
| if (query_ctx().status_report_interval_ms > 0) { |
| while (!WaitForFinishOrTimeout(GetReportWaitTimeMs())) { |
| ReportExecStatus(); |
| } |
| } else { |
| WaitForFinish(); |
| } |
| UpdateBackendExecState(); |
| DCHECK(IsTerminalState()); |
| |
| done: |
| if (backend_exec_state_ == BackendExecState::FINISHED) { |
| for (const auto& entry : fis_map_) { |
| DCHECK(entry.second->IsDone()); |
| } |
| } else { |
| // If the query execution hit an error, when the final status report is sent, the |
| // coordinator's response will instruct the QueryState to cancel itself, so Cancel() |
| // should have always been called by this point. |
| DCHECK_EQ(is_cancelled_.Load(), 1); |
| } |
| } |
| |
| void QueryState::AcquireBackendResourceRefcount() { |
| DCHECK(!released_backend_resources_); |
| backend_resource_refcnt_.Add(1); |
| } |
| |
| void QueryState::ReleaseBackendResourceRefcount() { |
| int32_t new_val = backend_resource_refcnt_.Add(-1); |
| DCHECK_GE(new_val, 0); |
| if (new_val == 0) ReleaseBackendResources(); |
| } |
| |
| void QueryState::ExecFInstance(FragmentInstanceState* fis) { |
| ScopedThreadContext debugctx(GetThreadDebugInfo(), fis->query_id(), fis->instance_id()); |
| |
| ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L); |
| ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L); |
| VLOG_QUERY << "Executing instance. instance_id=" << PrintId(fis->instance_id()) |
| << " fragment_idx=" << fis->instance_ctx().fragment_idx |
| << " per_fragment_instance_idx=" |
| << fis->instance_ctx().per_fragment_instance_idx |
| << " coord_state_idx=" << exec_rpc_params_.coord_state_idx() |
| << " #in-flight=" |
| << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue(); |
| Status status = fis->Exec(); |
| ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L); |
| VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id()) |
| << " #in-flight=" |
| << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue() |
| << " status=" << status; |
| |
| // Don't cancel other fragments here as the final report for "fis" may not have been |
| // sent yet. Cancellation will happen in ReportExecStatus() after sending the final |
| // report to the coordinator. Otherwise, the coordinator fragment may mark the status |
| // of this backend as "CANCELLED", masking the original error. |
| |
| // decrement refcount taken in StartFInstances() |
| ReleaseBackendResourceRefcount(); |
| // decrement refcount taken in StartFInstances() |
| ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this); |
| } |
| |
| void QueryState::Cancel() { |
| VLOG_QUERY << "Cancel: query_id=" << PrintId(query_id()); |
| { |
| std::lock_guard<std::mutex> l(init_lock_); |
| if (!is_initialized_) { |
| discard_result(is_cancelled_.CompareAndSwap(0, 1)); |
| return; |
| } |
| } |
| discard_result(WaitForPrepare()); |
| if (!is_cancelled_.CompareAndSwap(0, 1)) return; |
| if (filter_bank_ != nullptr) filter_bank_->Cancel(); |
| for (auto entry: fis_map_) entry.second->Cancel(); |
| } |
| |
| void QueryState::PublishFilter(const PublishFilterParamsPB& params, RpcContext* context) { |
| if (!WaitForPrepare().ok()) return; |
| filter_bank_->PublishGlobalFilter(params, context); |
| } |
| |
| Status QueryState::StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker) { |
| // Return an error message with the root cause of why spilling is disabled. |
| if (query_options().scratch_limit == 0) { |
| return mem_tracker->MemLimitExceeded( |
| runtime_state, "Could not free memory by spilling to disk: scratch_limit is 0"); |
| } else if (query_ctx_.disable_spilling) { |
| return mem_tracker->MemLimitExceeded(runtime_state, |
| "Could not free memory by spilling to disk: spilling was disabled by planner. " |
| "Re-enable spilling by setting the query option DISABLE_UNSAFE_SPILLS=false"); |
| } |
| // 'file_group_' must be non-NULL for spilling to be enabled. |
| DCHECK(file_group_ != nullptr); |
| if (query_spilled_.CompareAndSwap(0, 1)) { |
| ImpaladMetrics::NUM_QUERIES_SPILLED->Increment(1); |
| } |
| return Status::OK(); |
| } |
| } |