blob: 529ce2d6873dde8b07c355f5afcb0c0ac4e92408 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/coordinator.h"
#include <cerrno>
#include <iomanip>
#include <sstream>
#include <unordered_set>
#include <thrift/protocol/TDebugProtocol.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>
#include <gutil/strings/substitute.h>
#include "common/hdfs.h"
#include "exec/buffered-plan-root-sink.h"
#include "exec/data-sink.h"
#include "exec/plan-root-sink.h"
#include "gen-cpp/ImpalaInternalService_constants.h"
#include "gen-cpp/admission_control_service.pb.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "runtime/coordinator-backend-state.h"
#include "runtime/coordinator-filter-state.h"
#include "runtime/debug-options.h"
#include "runtime/exec-env.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/query-driver.h"
#include "runtime/query-exec-mgr.h"
#include "runtime/query-state.h"
#include "scheduling/admission-controller.h"
#include "scheduling/scheduler.h"
#include "service/client-request-state.h"
#include "util/bit-util.h"
#include "util/bloom-filter.h"
#include "util/hdfs-bulk-ops.h"
#include "util/hdfs-util.h"
#include "util/histogram-metric.h"
#include "util/kudu-status-util.h"
#include "util/min-max-filter.h"
#include "util/pretty-printer.h"
#include "util/table-printer.h"
#include "util/uid-util.h"
#include "common/names.h"
using kudu::rpc::RpcContext;
using kudu::rpc::RpcController;
using kudu::rpc::RpcSidecar;
using namespace apache::thrift;
using namespace rapidjson;
using boost::algorithm::iequals;
using boost::algorithm::is_any_of;
using boost::algorithm::join;
using boost::algorithm::token_compress_on;
using boost::algorithm::split;
using boost::filesystem::path;
DECLARE_string(hostname);
using namespace impala;
PROFILE_DEFINE_COUNTER(NumBackends, STABLE_HIGH, TUnit::UNIT,
"Number of backends running this query.");
PROFILE_DEFINE_COUNTER(TotalBytesRead, STABLE_HIGH, TUnit::BYTES,
"Total number of bytes read by a query.");
PROFILE_DEFINE_COUNTER(TotalCpuTime, STABLE_HIGH, TUnit::TIME_NS,
"Total CPU time (user + system) consumed by a query.");
PROFILE_DEFINE_COUNTER(FiltersReceived,STABLE_LOW, TUnit::UNIT,
"Total number of filter updates received (always 0 if filter mode is not "
"GLOBAL). Excludes repeated broadcast filter updates.");
PROFILE_DEFINE_COUNTER(NumFragments, STABLE_HIGH, TUnit::UNIT,
"Number of fragments in the plan of a query.");
PROFILE_DEFINE_COUNTER(NumFragmentInstances, STABLE_HIGH, TUnit::UNIT,
"Number of fragment instances executed by a query.");
PROFILE_DEFINE_COUNTER(TotalBytesSent, STABLE_LOW, TUnit::BYTES,"The total number"
" of bytes sent (across the network) by this query in exchange nodes. Does not "
"include remote reads, data written to disk, or data sent to the client.");
PROFILE_DEFINE_COUNTER(TotalScanBytesSent, STABLE_LOW, TUnit::BYTES,
"The total number of bytes sent (across the network) by fragment instances that "
"had a scan node in their plan.");
PROFILE_DEFINE_COUNTER(TotalInnerBytesSent, STABLE_LOW, TUnit::BYTES, "The total "
"number of bytes sent (across the network) by fragment instances that did not have a"
" scan node in their plan i.e. that received their input data from other instances"
" through exchange node.");
PROFILE_DEFINE_COUNTER(ExchangeScanRatio, STABLE_LOW, TUnit::DOUBLE_VALUE,
"The ratio between TotalScanByteSent and TotalBytesRead, i.e. the selectivity over "
"all fragment instances that had a scan node in their plan.");
PROFILE_DEFINE_COUNTER(InnerNodeSelectivityRatio, STABLE_LOW, TUnit::DOUBLE_VALUE,
"The ratio between bytes sent by instances with a scan node in their plan and "
"instances without a scan node in their plan. This indicates how well the inner "
"nodes of the execution plan reduced the data volume.");
PROFILE_DEFINE_COUNTER(NumCompletedBackends, STABLE_HIGH, TUnit::UNIT,"The number of "
"completed backends. Only valid after all backends have started executing. "
"Does not count the number of CANCELLED Backends.");
PROFILE_DEFINE_TIMER(FinalizationTimer, STABLE_LOW,
"Total time spent in finalization (typically 0 except for INSERT into hdfs tables).");
// Maximum number of fragment instances that can publish each broadcast filter.
static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
Coordinator::Coordinator(ClientRequestState* parent, const TExecRequest& exec_request,
const QuerySchedulePB& query_schedule, RuntimeProfile::EventSequence* events)
: parent_query_driver_(parent->parent_driver()),
parent_request_state_(parent),
exec_params_(exec_request, query_schedule),
filter_mode_(exec_params_.query_options().runtime_filter_mode),
obj_pool_(new ObjectPool()),
query_events_(events),
exec_rpcs_status_barrier_(query_schedule.backend_exec_params().size()),
backend_released_barrier_(query_schedule.backend_exec_params().size()),
filter_routing_table_(new FilterRoutingTable) {}
Coordinator::~Coordinator() {
// Must have entered a terminal exec state guaranteeing resources were released.
DCHECK(!IsExecuting());
DCHECK_LE(backend_exec_complete_barrier_->pending(), 0);
// Release the coordinator's reference to the query control structures.
if (query_state_ != nullptr) {
ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
}
}
Status Coordinator::Exec() {
const TQueryExecRequest& request = exec_params_.query_exec_request();
DCHECK(request.plan_exec_info.size() > 0);
VLOG_QUERY << "Exec() query_id=" << PrintId(query_id())
<< " stmt=" << request.query_ctx.client_request.stmt;
stmt_type_ = request.stmt_type;
query_profile_ =
RuntimeProfile::Create(obj_pool(), "Execution Profile " + PrintId(query_id()));
finalization_timer_ = PROFILE_FinalizationTimer.Instantiate(query_profile_);
filter_updates_received_ = PROFILE_FiltersReceived.Instantiate(query_profile_);
host_profiles_ = RuntimeProfile::Create(obj_pool(), "Per Node Profiles");
query_profile_->AddChild(host_profiles_);
SCOPED_TIMER(query_profile_->total_time_counter());
// initialize progress updater
const string& str = Substitute("Query $0", PrintId(query_id()));
progress_.Init(str, exec_params_.query_schedule().num_scan_ranges());
query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
query_ctx(), exec_params_.query_schedule().coord_backend_mem_limit());
filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
-1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
InitFragmentStats();
// create BackendStates and per-instance state, including profiles, and install
// the latter in the FragmentStats' root profile
InitBackendStates();
exec_summary_.Init(exec_params_);
if (filter_mode_ != TRuntimeFilterMode::OFF) {
// Populate the runtime filter routing table. This should happen before starting the
// fragment instances. This code anticipates the indices of the instance states
// created later on in ExecRemoteFragment()
InitFilterRoutingTable();
}
// At this point, all static setup is done and all structures are initialized. Only
// runtime-related state changes past this point (examples: fragment instance
// profiles, etc.)
RETURN_IF_ERROR(StartBackendExec());
RETURN_IF_ERROR(FinishBackendStartup());
// set coord_instance_ and coord_sink_
if (exec_params_.GetCoordFragment() != nullptr) {
// this blocks until all fragment instances have finished their Prepare phase
Status query_status = query_state_->GetFInstanceState(query_id(), &coord_instance_);
if (!query_status.ok()) return UpdateExecState(query_status, nullptr, FLAGS_hostname);
// We expected this query to have a coordinator instance.
DCHECK(coord_instance_ != nullptr);
// When GetFInstanceState() returns the coordinator instance, the Prepare phase is
// done and the FragmentInstanceState's root sink will be set up.
coord_sink_ = coord_instance_->GetRootSink();
DCHECK(coord_sink_ != nullptr);
}
return Status::OK();
}
void Coordinator::InitFragmentStats() {
const TPlanFragment* coord_fragment = exec_params_.GetCoordFragment();
int64_t total_num_finstances = 0;
DCHECK_GT(exec_params_.num_fragments(), 0);
for (const TPlanFragment* fragment : exec_params_.GetFragments()) {
string root_profile_name =
Substitute(fragment == coord_fragment ? "Coordinator Fragment $0" : "Fragment $0",
fragment->display_name);
string avg_profile_name = Substitute("Averaged Fragment $0", fragment->display_name);
int num_instances = exec_params_.query_schedule()
.fragment_exec_params(fragment->idx)
.instances_size();
total_num_finstances += num_instances;
// TODO: special-case the coordinator fragment?
FragmentStats* fragment_stats = obj_pool()->Add(
new FragmentStats(
avg_profile_name, root_profile_name, num_instances, obj_pool()));
fragment_stats_.push_back(fragment_stats);
query_profile_->AddChild(fragment_stats->avg_profile(), true);
query_profile_->AddChild(fragment_stats->root_profile());
}
COUNTER_SET(PROFILE_NumFragments.Instantiate(query_profile_),
static_cast<int64_t>(exec_params_.num_fragments()));
COUNTER_SET(PROFILE_NumFragmentInstances.Instantiate(query_profile_),
total_num_finstances);
}
void Coordinator::InitBackendStates() {
int num_backends = exec_params_.query_schedule().backend_exec_params().size();
DCHECK_GT(num_backends, 0);
lock_guard<SpinLock> l(backend_states_init_lock_);
backend_states_.resize(num_backends);
COUNTER_SET(PROFILE_NumBackends.Instantiate(query_profile_), num_backends);
// create BackendStates
int backend_idx = 0;
for (const BackendExecParamsPB& backend_exec_params :
exec_params_.query_schedule().backend_exec_params()) {
BackendState* backend_state = obj_pool()->Add(
new BackendState(exec_params_, backend_idx, filter_mode_, backend_exec_params));
backend_state->Init(fragment_stats_, host_profiles_, obj_pool());
backend_states_[backend_idx++] = backend_state;
// was_inserted is true if the pair was successfully inserted into the map, false
// otherwise.
bool was_inserted = addr_to_backend_state_
.emplace(backend_state->krpc_impalad_address(), backend_state)
.second;
if (UNLIKELY(!was_inserted)) {
DCHECK(false) << "Network address " << backend_state->krpc_impalad_address()
<< " associated with multiple BackendStates";
}
}
backend_resource_state_ = obj_pool()->Add(new BackendResourceState(backend_states_));
num_completed_backends_ = PROFILE_NumCompletedBackends.Instantiate(query_profile_);
}
void Coordinator::ExecSummary::Init(const QueryExecParams& exec_params) {
const TQueryExecRequest& request = exec_params.query_exec_request();
// init exec_summary_.{nodes, exch_to_sender_map}
thrift_exec_summary.__isset.nodes = true;
DCHECK(thrift_exec_summary.nodes.empty());
for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) {
for (const TPlanFragment& fragment: plan_exec_info.fragments) {
if (!fragment.__isset.plan) continue;
// eventual index of fragment's root node in exec_summary_.nodes
int root_node_idx = thrift_exec_summary.nodes.size();
const TPlan& plan = fragment.plan;
const TDataSink& output_sink = fragment.output_sink;
// Count the number of hosts and instances.
const FragmentExecParamsPB& fragment_exec_param =
exec_params.query_schedule().fragment_exec_params(fragment.idx);
int num_hosts = fragment_exec_param.num_hosts();
int num_instances = fragment_exec_param.instances_size();
// Add the data sink at the root of the fragment.
data_sink_id_to_idx_map[fragment.idx] = thrift_exec_summary.nodes.size();
thrift_exec_summary.nodes.emplace_back();
// Note that some clients like impala-shell depend on many of these fields being
// set, even if they are optional in the thrift.
TPlanNodeExecSummary& node_summary = thrift_exec_summary.nodes.back();
node_summary.__set_node_id(-1);
node_summary.__set_fragment_idx(fragment.idx);
node_summary.__set_label(output_sink.label);
node_summary.__set_label_detail("");
node_summary.__set_num_children(1);
DCHECK(output_sink.__isset.estimated_stats);
node_summary.__set_estimated_stats(output_sink.estimated_stats);
node_summary.__set_num_hosts(num_hosts);
node_summary.exec_stats.resize(num_instances);
// We don't track rows returned from sinks, but some clients like impala-shell
// expect it to be set in the thrift struct. Set it to -1 for compatibility
// with those tools.
node_summary.estimated_stats.__set_cardinality(-1);
for (TExecStats& instance_stats : node_summary.exec_stats) {
instance_stats.__set_cardinality(-1);
}
for (const TPlanNode& node : plan.nodes) {
node_id_to_idx_map[node.node_id] = thrift_exec_summary.nodes.size();
thrift_exec_summary.nodes.emplace_back();
TPlanNodeExecSummary& node_summary = thrift_exec_summary.nodes.back();
node_summary.__set_node_id(node.node_id);
node_summary.__set_fragment_idx(fragment.idx);
node_summary.__set_label(node.label);
node_summary.__set_label_detail(node.label_detail);
node_summary.__set_num_children(node.num_children);
DCHECK(node.__isset.estimated_stats);
node_summary.__set_estimated_stats(node.estimated_stats);
node_summary.__set_num_hosts(num_hosts);
node_summary.exec_stats.resize(num_instances);
}
if (fragment.__isset.output_sink &&
(fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK
|| IsJoinBuildSink(fragment.output_sink.type))) {
int dst_node_idx;
if (fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK) {
const TDataStreamSink& sink = fragment.output_sink.stream_sink;
dst_node_idx = node_id_to_idx_map[sink.dest_node_id];
if (sink.output_partition.type == TPartitionType::UNPARTITIONED) {
thrift_exec_summary.nodes[dst_node_idx].__set_is_broadcast(true);
}
} else {
DCHECK(IsJoinBuildSink(fragment.output_sink.type));
const TJoinBuildSink& sink = fragment.output_sink.join_build_sink;
dst_node_idx = node_id_to_idx_map[sink.dest_node_id];
}
thrift_exec_summary.__isset.exch_to_sender_map = true;
thrift_exec_summary.exch_to_sender_map[dst_node_idx] = root_node_idx;
}
}
}
}
void Coordinator::InitFilterRoutingTable() {
DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
<< "InitFilterRoutingTable() called although runtime filters are disabled";
DCHECK(!filter_routing_table_->is_complete)
<< "InitFilterRoutingTable() called after table marked as complete";
lock_guard<shared_mutex> lock(filter_routing_table_->lock); // Exclusive lock.
for (const FragmentExecParamsPB& fragment_params :
exec_params_.query_schedule().fragment_exec_params()) {
int num_instances = fragment_params.instances_size();
DCHECK_GT(num_instances, 0);
int num_backends = fragment_params.num_hosts();
DCHECK_GT(num_backends, 0);
const TPlanFragment* fragment =
exec_params_.GetFragments()[fragment_params.fragment_idx()];
// Hash join build sinks can produce filters in mt_dop > 0 plans.
if (fragment->output_sink.__isset.join_build_sink) {
const TJoinBuildSink& join_sink = fragment->output_sink.join_build_sink;
for (const TRuntimeFilterDesc& filter: join_sink.runtime_filters) {
// The join node ID is used to identify the join that produces the filter, even
// though the builder is separate from the actual node.
DCHECK_EQ(filter.src_node_id, join_sink.dest_node_id);
AddFilterSource(
fragment_params, num_instances, num_backends, filter, filter.src_node_id);
}
}
for (const TPlanNode& plan_node : fragment->plan.nodes) {
if (!plan_node.__isset.runtime_filters) continue;
for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || filter.has_local_targets);
// Currently hash joins are the only filter sources. Otherwise it must be
// a filter consumer.
if (plan_node.__isset.join_node &&
plan_node.join_node.__isset.hash_join_node) {
AddFilterSource(
fragment_params, num_instances, num_backends, filter, plan_node.node_id);
} else if (plan_node.__isset.hdfs_scan_node || plan_node.__isset.kudu_scan_node) {
FilterState* f = filter_routing_table_->GetOrCreateFilterState(filter);
auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
DCHECK(it != filter.planid_to_target_ndx.end());
const TRuntimeFilterTargetDesc& t_target = filter.targets[it->second];
DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || t_target.is_local_target);
f->targets()->emplace_back(t_target, fragment->idx);
} else {
DCHECK(false) << "Unexpected plan node with runtime filters: "
<< ThriftDebugString(plan_node);
}
}
}
}
query_profile_->AddInfoString(
"Number of filters", Substitute("$0", filter_routing_table_->num_filters()));
query_profile_->AddInfoString("Filter routing table", FilterDebugString());
if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString();
filter_routing_table_->is_complete = true;
}
void Coordinator::AddFilterSource(const FragmentExecParamsPB& src_fragment_params,
int num_instances, int num_backends, const TRuntimeFilterDesc& filter,
int join_node_id) {
FilterState* f = filter_routing_table_->GetOrCreateFilterState(filter);
// Set the 'pending_count_' to zero to indicate that for a filter with
// local-only targets the coordinator does not expect to receive any filter
// updates. We expect to receive a single aggregated filter from each backend
// for partitioned joins.
int pending_count = filter.is_broadcast_join
? (filter.has_remote_targets ? 1 : 0) : num_backends;
f->set_pending_count(pending_count);
// Determine which instances will produce the filters.
// TODO: IMPALA-9333: having a shared RuntimeFilterBank between all fragments on
// a backend allows further optimizations to reduce the number of broadcast join
// filters sent over the network, by considering cross-fragment filters on
// the same backend as local filters:
// 1. Produce a local filter on any backend with a destination fragment.
// 2. Only produce one local filter per backend (although, this would be made
// redundant by IMPALA-4224 - sharing broadcast join hash tables).
// 3. Don't produce a global filter if all targets can be satisfied with
// local producers.
// This work was deferred from the IMPALA-4400 change because it provides only
// incremental performance benefits.
vector<int> src_idxs;
for (const UniqueIdPB& instance_id : src_fragment_params.instances()) {
src_idxs.push_back(GetInstanceIdx(instance_id));
}
// If this is a broadcast join with only non-local targets, build and publish it
// on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join
// or it is a broadcast join with local targets, it should be generated
// everywhere the join is executed.
if (filter.is_broadcast_join && !filter.has_local_targets
&& num_instances > MAX_BROADCAST_FILTER_PRODUCERS) {
random_shuffle(src_idxs.begin(), src_idxs.end());
src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
}
for (int src_idx : src_idxs) {
TRuntimeFilterSource filter_src;
filter_src.src_node_id = join_node_id;
filter_src.filter_id = filter.filter_id;
filter_routing_table_->finstance_filters_produced[src_idx].emplace_back(
filter_src);
}
f->set_num_producers(src_idxs.size());
}
void Coordinator::WaitOnExecRpcs() {
if (exec_rpcs_complete_.Load()) return;
for (BackendState* backend_state : backend_states_) {
backend_state->WaitOnExecRpc();
}
exec_rpcs_complete_.Store(true);
}
Status Coordinator::StartBackendExec() {
int num_backends = backend_states_.size();
backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
DebugOptions debug_options(exec_params_.query_options());
VLOG_QUERY << "starting execution on " << num_backends << " backends for query_id="
<< PrintId(query_id());
query_events_->MarkEvent(Substitute("Ready to start on $0 backends", num_backends));
// Serialize the TQueryCtx once and pass it to each backend. The serialized buffer must
// stay valid until WaitOnExecRpcs() has returned.
ThriftSerializer serializer(true);
uint8_t* serialized_buf = nullptr;
uint32_t serialized_len = 0;
Status serialize_status =
serializer.SerializeToBuffer(&query_ctx(), &serialized_len, &serialized_buf);
if (UNLIKELY(!serialize_status.ok())) {
return UpdateExecState(serialize_status, nullptr, FLAGS_hostname);
}
kudu::Slice query_ctx_slice(serialized_buf, serialized_len);
for (BackendState* backend_state: backend_states_) {
if (exec_rpcs_status_barrier_.pending() <= 0) {
// One of the backends has already indicated an error with Exec().
break;
}
DebugActionNoFail(exec_params_.query_options(), "COORD_BEFORE_EXEC_RPC");
// Safe for ExecAsync() to read 'filter_routing_table_' because it is complete
// at this point and won't be destroyed while this function is executing,
// because it won't be torn down until WaitOnExecRpcs() has returned.
DCHECK(filter_mode_ == TRuntimeFilterMode::OFF || filter_routing_table_->is_complete);
backend_state->ExecAsync(debug_options, *filter_routing_table_, query_ctx_slice,
&exec_rpcs_status_barrier_);
}
Status exec_rpc_status = exec_rpcs_status_barrier_.Wait();
if (!exec_rpc_status.ok()) {
// One of the backends failed to startup, so we cancel the other ones.
CancelBackends(/*fire_and_forget=*/ true);
WaitOnExecRpcs();
vector<BackendState*> failed_backend_states;
for (BackendState* backend_state : backend_states_) {
// If Exec() rpc failed for a reason besides being aborted, blacklist the executor
// and retry the query.
if (!backend_state->exec_rpc_status().ok()
&& !backend_state->exec_rpc_status().IsAborted()) {
failed_backend_states.push_back(backend_state);
LOG(INFO) << "Blacklisting " << backend_state->impalad_address()
<< " because an Exec() rpc to it failed.";
const UniqueIdPB& backend_id = backend_state->exec_params().backend_id();
ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(backend_id,
FromKuduStatus(backend_state->exec_rpc_status(), "Exec() rpc failed"));
parent_request_state_->AddBlacklistedExecutorAddress(
backend_state->exec_params().address());
}
}
if (!failed_backend_states.empty()) {
HandleFailedExecRpcs(failed_backend_states);
}
VLOG_QUERY << "query startup cancelled due to a failed Exec() rpc: "
<< exec_rpc_status;
return UpdateExecState(exec_rpc_status, nullptr, FLAGS_hostname);
}
WaitOnExecRpcs();
VLOG_QUERY << "started execution on " << num_backends << " backends for query_id="
<< PrintId(query_id());
query_events_->MarkEvent(
Substitute("All $0 execution backends ($1 fragment instances) started",
num_backends, exec_params_.GetNumFragmentInstances()));
return Status::OK();
}
Status Coordinator::FinishBackendStartup() {
DCHECK(exec_rpcs_complete_.Load());
const TMetricDef& def =
MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
// Capture up to 30 minutes of start-up times, in ms, with 4 s.f. accuracy.
HistogramMetric latencies(def, 30 * 60 * 1000, 4);
Status status = Status::OK();
string error_hostname;
string max_latency_host;
int max_latency = 0;
for (BackendState* backend_state: backend_states_) {
// All of the Exec() rpcs must have completed successfully.
DCHECK(backend_state->exec_rpc_status().ok());
// preserve the first non-OK, if there is one
Status backend_status = backend_state->GetStatus();
if (!backend_status.ok() && status.ok()) {
status = backend_status;
error_hostname = backend_state->impalad_address().hostname();
}
if (backend_state->rpc_latency() > max_latency) {
// Find the backend that takes the most time to acknowledge to
// the ExecQueryFInstances() RPC.
max_latency = backend_state->rpc_latency();
max_latency_host = NetworkAddressPBToString(backend_state->impalad_address());
}
latencies.Update(backend_state->rpc_latency());
// Mark backend complete if no fragment instances were assigned to it.
if (backend_state->IsEmptyBackend()) {
backend_exec_complete_barrier_->Notify();
num_completed_backends_->Add(1);
}
}
query_profile_->AddInfoString(
"Backend startup latencies", latencies.ToHumanReadable());
query_profile_->AddInfoString("Slowest backend to start up", max_latency_host);
return UpdateExecState(status, nullptr, error_hostname);
}
string Coordinator::FilterDebugString() {
TablePrinter table_printer;
table_printer.AddColumn("ID", false);
table_printer.AddColumn("Src. Node", false);
table_printer.AddColumn("Tgt. Node(s)", false);
table_printer.AddColumn("Target type", false);
table_printer.AddColumn("Partition filter", false);
// Distribution metrics are only meaningful if the coordinator is routing the filter.
if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
table_printer.AddColumn("Pending (Expected)", false);
table_printer.AddColumn("First arrived", false);
table_printer.AddColumn("Completed", false);
}
table_printer.AddColumn("Enabled", false);
table_printer.AddColumn("Bloom Size", false);
table_printer.AddColumn("Est fpp", false);
for (auto& v: filter_routing_table_->id_to_filter) {
vector<string> row;
const FilterState& state = v.second;
row.push_back(lexical_cast<string>(v.first));
row.push_back(lexical_cast<string>(state.desc().src_node_id));
vector<string> target_ids;
vector<string> target_types;
vector<string> partition_filter;
for (const FilterTarget& target: state.targets()) {
target_ids.push_back(lexical_cast<string>(target.node_id));
target_types.push_back(target.is_local ? "LOCAL" : "REMOTE");
partition_filter.push_back(target.is_bound_by_partition_columns ? "true" : "false");
}
row.push_back(join(target_ids, ", "));
row.push_back(join(target_types, ", "));
row.push_back(join(partition_filter, ", "));
if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
int pending_count = state.completion_time() != 0L ? 0 : state.pending_count();
row.push_back(Substitute("$0 ($1)", pending_count, state.num_producers()));
if (state.first_arrival_time() == 0L) {
row.push_back("N/A");
} else {
row.push_back(PrettyPrinter::Print(state.first_arrival_time(), TUnit::TIME_NS));
}
if (state.completion_time() == 0L) {
row.push_back("N/A");
} else {
row.push_back(PrettyPrinter::Print(state.completion_time(), TUnit::TIME_NS));
}
}
// In case of remote filter, we might intentionally disable the filter upon
// completion to prevent further update. In such case, we should check if all filter
// updates have been successfully received.
row.push_back(state.enabled() || state.received_all_updates() ? "true" : "false");
// Add size and fpp for bloom filters, the filter type otherwise.
if (state.is_bloom_filter()) {
int64_t filter_size = state.desc().filter_size_bytes;
row.push_back(PrettyPrinter::Print(filter_size, TUnit::BYTES));
double fpp = BloomFilter::FalsePositiveProb(
state.desc().ndv_estimate, BitUtil::Log2Ceiling64(filter_size));
stringstream ss;
ss << setprecision(3) << fpp;
row.push_back(ss.str());
} else {
row.push_back(PrintThriftEnum(state.desc().type));
row.push_back("");
}
table_printer.AddRow(row);
}
// Add a line break, as in all contexts this is called we need to start a new line to
// print it correctly.
return Substitute("\n$0", table_printer.ToString());
}
const char* Coordinator::ExecStateToString(const ExecState state) {
static const unordered_map<ExecState, const char *> exec_state_to_str{
{ExecState::EXECUTING, "EXECUTING"},
{ExecState::RETURNED_RESULTS, "RETURNED_RESULTS"},
{ExecState::CANCELLED, "CANCELLED"},
{ExecState::ERROR, "ERROR"}};
return exec_state_to_str.at(state);
}
Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
DCHECK(state == ExecState::RETURNED_RESULTS || state == ExecState::CANCELLED);
Status ret_status;
{
lock_guard<SpinLock> l(exec_state_lock_);
// May have already entered a terminal state, in which case nothing to do.
if (!IsExecuting()) return exec_status_;
DCHECK(exec_status_.ok()) << exec_status_;
exec_state_.Store(state);
if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
ret_status = exec_status_;
}
VLOG_QUERY << Substitute("ExecState: query id=$0 execution $1", PrintId(query_id()),
state == ExecState::CANCELLED ? "cancelled" : "completed");
HandleExecStateTransition(ExecState::EXECUTING, state);
return ret_status;
}
Status Coordinator::UpdateExecState(const Status& status,
const TUniqueId* failed_finst, const string& instance_hostname) {
Status ret_status;
ExecState old_state, new_state;
{
lock_guard<SpinLock> l(exec_state_lock_);
old_state = exec_state_.Load();
if (old_state == ExecState::EXECUTING) {
DCHECK(exec_status_.ok()) << exec_status_;
if (!status.ok()) {
// Error while executing - go to ERROR state.
exec_status_ = status;
exec_state_.Store(ExecState::ERROR);
}
} else if (old_state == ExecState::RETURNED_RESULTS) {
// Already returned all results. Leave exec status as ok, stay in this state.
DCHECK(exec_status_.ok()) << exec_status_;
} else if (old_state == ExecState::CANCELLED) {
// Client requested cancellation already, stay in this state. Ignores errors
// after requested cancellations.
DCHECK(exec_status_.IsCancelled()) << exec_status_;
} else {
// Already in the ERROR state, stay in this state but update status to be the
// first non-cancelled status.
DCHECK_EQ(old_state, ExecState::ERROR);
DCHECK(!exec_status_.ok());
if (!status.ok() && !status.IsCancelled() && exec_status_.IsCancelled()) {
exec_status_ = status;
}
}
new_state = exec_state_.Load();
ret_status = exec_status_;
}
// Log interesting status: a non-cancelled error or a cancellation if was executing.
if (!status.ok() && (!status.IsCancelled() || old_state == ExecState::EXECUTING)) {
VLOG_QUERY << Substitute(
"ExecState: query id=$0 finstance=$1 on host=$2 ($3 -> $4) status=$5",
PrintId(query_id()), failed_finst != nullptr ? PrintId(*failed_finst) : "N/A",
instance_hostname, ExecStateToString(old_state), ExecStateToString(new_state),
status.GetDetail());
}
// After dropping the lock, apply the state transition (if any) side-effects.
HandleExecStateTransition(old_state, new_state);
return ret_status;
}
void Coordinator::HandleExecStateTransition(
const ExecState old_state, const ExecState new_state) {
static const unordered_map<ExecState, const char *> exec_state_to_event{
{ExecState::EXECUTING, "Executing"},
{ExecState::RETURNED_RESULTS, "Last row fetched"},
{ExecState::CANCELLED, "Execution cancelled"},
{ExecState::ERROR, "Execution error"}};
if (old_state == new_state) return;
// Once we enter a terminal state, we stay there, guaranteeing this code runs only once.
DCHECK_EQ(old_state, ExecState::EXECUTING);
// Should never transition to the initial state.
DCHECK_NE(new_state, ExecState::EXECUTING);
// Can't transition until the exec RPCs are no longer in progress. Otherwise, a
// cancel RPC could be missed, and resources freed before a backend has had a chance
// to take a resource reference.
DCHECK(exec_rpcs_complete_.Load()) << "exec rpcs not completed";
query_events_->MarkEvent(exec_state_to_event.at(new_state));
// This thread won the race to transitioning into a terminal state - terminate
// execution and release resources.
ReleaseExecResources();
if (new_state == ExecState::RETURNED_RESULTS) {
// TODO: IMPALA-6984: cancel all backends in this case too.
WaitForBackends();
} else {
CancelBackends(/*fire_and_forget=*/ true);
}
ReleaseQueryAdmissionControlResources();
// Once the query has released its admission control resources, update its end time.
parent_request_state_->UpdateEndTime();
// Can compute summary only after we stop accepting reports from the backends. Both
// WaitForBackends() and CancelBackends() ensures that.
// TODO: should move this off of the query execution path?
ComputeQuerySummary();
finalized_.Set(true);
}
Status Coordinator::FinalizeHdfsDml() {
// All instances must have reported their final statuses before finalization, which is a
// post-condition of Wait. If the query was not successful, still try to clean up the
// staging directory.
DCHECK(has_called_wait_.Load());
DCHECK(finalize_params() != nullptr);
bool is_transactional = finalize_params()->__isset.write_id;
VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
SCOPED_TIMER(finalization_timer_);
Status return_status = UpdateExecState(Status::OK(), nullptr, FLAGS_hostname);
if (return_status.ok()) {
HdfsTableDescriptor* hdfs_table;
DCHECK(query_ctx().__isset.desc_tbl_serialized);
RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(
query_ctx().desc_tbl_serialized, finalize_params()->table_id, obj_pool(),
&hdfs_table));
DCHECK(hdfs_table != nullptr)
<< "INSERT target table not known in descriptor table: "
<< finalize_params()->table_id;
// There is no need for finalization for transactional inserts.
if (!is_transactional) {
// 'write_id' is NOT set, therefore we need to do some finalization, e.g. moving
// files or delete old files in case of INSERT OVERWRITE.
return_status = dml_exec_state_.FinalizeHdfsInsert(*finalize_params(),
query_ctx().client_request.query_options.s3_skip_insert_staging,
hdfs_table, query_profile_);
}
hdfs_table->ReleaseResources();
} else if (is_transactional) {
parent_request_state_->AbortTransaction();
}
if (is_transactional) {
DCHECK(!finalize_params()->__isset.staging_dir);
} else {
RETURN_IF_ERROR(DeleteQueryLevelStagingDir());
}
return return_status;
}
Status Coordinator::DeleteQueryLevelStagingDir() {
stringstream staging_dir;
DCHECK(finalize_params()->__isset.staging_dir);
staging_dir << finalize_params()->staging_dir << "/" << PrintId(query_id(),"_") << "/";
hdfsFS hdfs_conn;
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), &hdfs_conn));
VLOG_QUERY << "Removing staging directory: " << staging_dir.str();
hdfsDelete(hdfs_conn, staging_dir.str().c_str(), 1);
return Status::OK();
}
void Coordinator::WaitForBackends() {
int32_t num_remaining = backend_exec_complete_barrier_->pending();
if (num_remaining > 0) {
VLOG_QUERY << "Coordinator waiting for backends to finish, " << num_remaining
<< " remaining. query_id=" << PrintId(query_id());
backend_exec_complete_barrier_->Wait();
}
}
Status Coordinator::Wait() {
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
lock_guard<SpinLock> l(wait_lock_);
SCOPED_TIMER(query_profile_->total_time_counter());
if (has_called_wait_.Load()) return Status::OK();
has_called_wait_.Store(true);
if (stmt_type_ == TStmtType::QUERY) {
DCHECK(coord_instance_ != nullptr);
RETURN_IF_ERROR(UpdateExecState(coord_instance_->WaitForOpen(),
&coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname));
if (query_state_->query_options().retry_failed_queries
&& query_state_->query_options().spool_query_results
&& query_state_->query_options().spool_all_results_for_retries) {
// Wait until the BufferedPlanRootSink spooled all results or any errors stopping
// it, e.g. batch queue full, cancellation or failures.
auto sink = static_cast<BufferedPlanRootSink*>(coord_sink_);
if (sink->WaitForAllResultsSpooled()) {
VLOG_QUERY << "Cannot spool all results in the allocated result spooling space."
" Query retry will be skipped if any results have been returned.";
}
}
return Status::OK();
}
DCHECK_EQ(stmt_type_, TStmtType::DML);
// DML finalization can only happen when all backends have completed all side-effects
// and reported relevant state.
WaitForBackends();
if (finalize_params() != nullptr) {
RETURN_IF_ERROR(UpdateExecState(
FinalizeHdfsDml(), nullptr, FLAGS_hostname));
}
// DML requests are finished at this point.
RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
query_profile_->AddInfoString(
"DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
return Status::OK();
}
Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos,
int64_t block_on_wait_time_us) {
VLOG_ROW << "GetNext() query_id=" << PrintId(query_id());
DCHECK(has_called_wait_.Load());
SCOPED_TIMER(query_profile_->total_time_counter());
if (ReturnedAllResults()) {
// Nothing left to do: already in a terminal state and no more results.
*eos = true;
return Status::OK();
}
DCHECK(coord_instance_ != nullptr) << "Exec() should be called first";
DCHECK(coord_sink_ != nullptr) << "Exec() should be called first";
RuntimeState* runtime_state = coord_instance_->runtime_state();
// If FETCH_ROWS_TIMEOUT_MS is 0, then the timeout passed to PlanRootSink::GetNext()
// should be 0 as well so that the method waits for rows indefinitely.
// If the first row has been fetched, then set the timeout to FETCH_ROWS_TIMEOUT_MS. If
// the first row has not been fetched, then it is possible the client spent time
// waiting for the query to 'finish' before issuing a GetNext() request.
int64_t timeout_us;
if (parent_request_state_->fetch_rows_timeout_us() == 0) {
timeout_us = 0;
} else {
timeout_us = !first_row_fetched_ ?
max(static_cast<int64_t>(1),
parent_request_state_->fetch_rows_timeout_us() - block_on_wait_time_us) :
parent_request_state_->fetch_rows_timeout_us();
}
Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos, timeout_us);
if (!first_row_fetched_ && results->size() > 0) {
query_events_->MarkEvent("First row fetched");
first_row_fetched_ = true;
}
RETURN_IF_ERROR(UpdateExecState(
status, &runtime_state->fragment_instance_id(), FLAGS_hostname));
if (*eos) RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
return Status::OK();
}
void Coordinator::Cancel(bool wait_until_finalized) {
// Illegal to call Cancel() before Exec() returns, so there's no danger of the cancel
// RPC passing the exec RPC.
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
discard_result(SetNonErrorTerminalState(ExecState::CANCELLED));
// CancelBackends() is called for all transitions into a terminal state.
// RETURNED_RESULTS, however, calls it with fire_and_forget=false and may be blocked
// waiting for cancellation. In that case, we want explicit cancellation to unblock
// backend_exec_complete_barrier_, which we do by forcing cancellation.
if (ReturnedAllResults()) CancelBackends(/*fire_and_forget=*/ true);
// IMPALA-5756: Wait until finalized, in case a different thread was handling the
// transition to the terminal state.
if (wait_until_finalized) finalized_.Get();
}
void Coordinator::CancelBackends(bool fire_and_forget) {
int num_cancelled = 0;
for (BackendState* backend_state: backend_states_) {
DCHECK(backend_state != nullptr);
BackendState::CancelResult cr = backend_state->Cancel(fire_and_forget);
if (cr.cancel_attempted) ++num_cancelled;
if (!fire_and_forget && cr.became_done) backend_exec_complete_barrier_->Notify();
}
if (fire_and_forget) backend_exec_complete_barrier_->NotifyRemaining();
VLOG_QUERY << Substitute(
"CancelBackends() query_id=$0, tried to cancel $1 backends",
PrintId(query_id()), num_cancelled);
}
Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
const TRuntimeProfileForest& thrift_profiles) {
const int32_t coord_state_idx = request.coord_state_idx();
VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
<< " backend_idx=" << coord_state_idx;
if (coord_state_idx >= backend_states_.size()) {
return Status(TErrorCode::INTERNAL_ERROR,
Substitute("Unknown backend index $0 (max known: $1)",
coord_state_idx, backend_states_.size() - 1));
}
BackendState* backend_state = backend_states_[coord_state_idx];
if (thrift_profiles.__isset.host_profile) {
backend_state->UpdateHostProfile(thrift_profiles.host_profile);
}
// Set by ApplyExecStatusReport, contains all the AuxErrorInfoPB objects in
// ReportExecStatusRequestPB.
vector<AuxErrorInfoPB> aux_error_info;
if (backend_state->ApplyExecStatusReport(request, thrift_profiles, &exec_summary_,
&progress_, &dml_exec_state_, &aux_error_info)) {
// This backend execution has completed.
if (VLOG_QUERY_IS_ON) {
// Don't log backend completion if the query has already been cancelled.
int pending_backends = backend_exec_complete_barrier_->pending();
if (pending_backends >= 1) {
VLOG_QUERY << "Backend completed:"
<< " host=" << backend_state->impalad_address()
<< " remaining=" << pending_backends
<< " query_id=" << PrintId(query_id());
BackendState::LogFirstInProgress(backend_states_);
}
}
bool is_fragment_failure;
TUniqueId failed_instance_id;
Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
// Iterate through all AuxErrorInfoPB objects, and use each one to possibly blacklist
// any "faulty" nodes.
Status retryable_status = UpdateBlacklistWithAuxErrorInfo(
&aux_error_info, status, backend_state);
// If any nodes were blacklisted, retry the query. This needs to be done before
// UpdateExecState is called with the error status to avoid exposing the error to any
// clients. If a retry is attempted, the ClientRequestState::query_status_ will be
// set by TryQueryRetry, which prevents the error status from being exposed to any
// clients.
if (!retryable_status.ok()) {
parent_query_driver_->TryQueryRetry(parent_request_state_, &retryable_status);
}
if (!status.ok()) {
// We may start receiving status reports before all exec rpcs are complete.
// Can't apply state transition until no more exec rpcs will be sent.
// We should stop issuing ExecQueryFInstance rpcs and cancel any inflight
// when this happens.
if (!exec_rpcs_complete_.Load()) {
if (!status.IsCancelled()) exec_rpcs_status_barrier_.NotifyRemaining(status);
WaitOnExecRpcs();
}
// Transition the status if we're not already in a terminal state. This won't block
// because either this transitions to an ERROR state or the query is already in
// a terminal state.
// If both 'retryable_status' and 'status' are errors, prefer 'retryable_status' as
// it includes 'status' as well as additional error log information from
// UpdateBlacklistWithAuxErrorInfo.
const Status& update_exec_state_status =
!retryable_status.ok() ? retryable_status : status;
discard_result(UpdateExecState(update_exec_state_status,
is_fragment_failure ? &failed_instance_id : nullptr,
NetworkAddressPBToString(backend_state->impalad_address())));
}
// We've applied all changes from the final status report - notify waiting threads.
discard_result(backend_exec_complete_barrier_->Notify());
// Mark backend_state as closed and release the backend_state's resources if
// necessary.
vector<BackendState*> releasable_backends;
backend_resource_state_->MarkBackendFinished(backend_state, &releasable_backends);
if (!releasable_backends.empty()) {
ReleaseBackendAdmissionControlResources(releasable_backends);
backend_resource_state_->BackendsReleased(releasable_backends);
for (int i = 0; i < releasable_backends.size(); ++i) {
backend_released_barrier_.Notify();
}
}
num_completed_backends_->Add(1);
} else {
// Iterate through all AuxErrorInfoPB objects, and use each one to possibly blacklist
// any "faulty" nodes.
Status retryable_status = UpdateBlacklistWithAuxErrorInfo(
&aux_error_info, Status::OK(), backend_state);
// If any nodes were blacklisted, retry the query.
if (!retryable_status.ok()) {
parent_query_driver_->TryQueryRetry(parent_request_state_, &retryable_status);
}
}
// If query execution has terminated, return a cancelled status to force the fragment
// instance to stop executing.
// After cancelling backend_state, it's possible that current exec_state is still
// EXECUTING but the backend status is not OK since execution status report is not
// applied to update the overall status. In such case, we should return a cancelled
// status to backend.
return (IsExecuting() && backend_state->GetStatus().ok()) ? Status::OK() :
Status::CANCELLED;
}
Status Coordinator::UpdateBlacklistWithAuxErrorInfo(
vector<AuxErrorInfoPB>* aux_error_info, const Status& status,
BackendState* backend_state) {
// If the Backend failed due to a RPC failure, blacklist the destination node of
// the failed RPC. Only blacklist one node per ReportExecStatusRequestPB to avoid
// blacklisting nodes too aggressively. Currently, only blacklist the first node
// that contains a valid RPCErrorInfoPB object.
for (auto aux_error : *aux_error_info) {
if (aux_error.has_rpc_error_info()) {
RPCErrorInfoPB rpc_error_info = aux_error.rpc_error_info();
DCHECK(rpc_error_info.has_dest_node());
DCHECK(rpc_error_info.has_posix_error_code());
const NetworkAddressPB& dest_node = rpc_error_info.dest_node();
auto dest_node_and_be_state = addr_to_backend_state_.find(dest_node);
// If the target address of the RPC is not known to the Coordinator, it cannot
// be blacklisted.
if (dest_node_and_be_state == addr_to_backend_state_.end()) {
string err_msg = "Query failed due to a failed RPC to an unknown target address "
+ NetworkAddressPBToString(dest_node);
DCHECK(false) << err_msg;
LOG(ERROR) << err_msg;
continue;
}
// The execution parameters of the destination node for the failed RPC.
const BackendExecParamsPB& dest_node_exec_params =
dest_node_and_be_state->second->exec_params();
// The Coordinator for the query should never be blacklisted.
if (dest_node_exec_params.is_coord_backend()) {
VLOG_QUERY << "Query failed due to a failed RPC to the Coordinator";
continue;
}
// A set of RPC related posix error codes that should cause the target node
// of the failed RPC to be blacklisted.
static const set<int32_t> blacklistable_rpc_error_codes = {
ECONNRESET, // 104: Connection reset by peer
ENOTCONN, // 107: Transport endpoint is not connected
ESHUTDOWN, // 108: Cannot send after transport endpoint shutdown
ECONNREFUSED // 111: Connection refused
};
// If the RPC error code matches any of the 'blacklistable' errors codes, blacklist
// the target executor of the RPC and return.
if (blacklistable_rpc_error_codes.find(rpc_error_info.posix_error_code())
!= blacklistable_rpc_error_codes.end()) {
string src_node_addr =
NetworkAddressPBToString(backend_state->krpc_impalad_address());
string dest_node_addr = NetworkAddressPBToString(dest_node);
VLOG_QUERY << Substitute(
"Blacklisting $0 because a RPC to it failed, query_id=$1", dest_node_addr,
PrintId(query_id()));
Status retryable_status = Status::Expected(
Substitute("RPC from $0 to $1 failed", src_node_addr, dest_node_addr));
retryable_status.MergeStatus(status);
ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
dest_node_exec_params.backend_id(), retryable_status);
parent_request_state_->AddBlacklistedExecutorAddress(
dest_node_exec_params.address());
// Only blacklist one node per report.
return retryable_status;
}
}
}
return Status::OK();
}
void Coordinator::HandleFailedExecRpcs(vector<BackendState*> failed_backend_states) {
DCHECK(!failed_backend_states.empty());
// Create an error based on the Exec RPC failure Status
vector<string> backend_addresses;
for (BackendState* backend_state : failed_backend_states) {
backend_addresses.push_back(
NetworkAddressPBToString(backend_state->krpc_impalad_address()));
}
Status retryable_status = Status::Expected(
Substitute("ExecFInstances RPC to $0 failed", join(backend_addresses, ",")));
for (BackendState* backend_state : failed_backend_states) {
retryable_status.MergeStatus(
FromKuduStatus(backend_state->exec_rpc_status(), "Exec() rpc failed"));
}
// Retry the query
parent_query_driver_->TryQueryRetry(parent_request_state_, &retryable_status);
}
int64_t Coordinator::GetMaxBackendStateLagMs(NetworkAddressPB* address) {
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first.";
DCHECK_GT(backend_states_.size(), 0);
int64_t current_time = BackendState::GenerateReportTimestamp();
int64_t min_last_report_time_ms = current_time;
BackendState* min_state = nullptr;
for (BackendState* backend_state : backend_states_) {
if (backend_state->IsDone()) continue;
int64_t last_report_time_ms = backend_state->last_report_time_ms();
DCHECK_GT(last_report_time_ms, 0);
if (last_report_time_ms < min_last_report_time_ms) {
min_last_report_time_ms = last_report_time_ms;
min_state = backend_state;
}
}
if (min_state == nullptr) return 0;
*address = min_state->krpc_impalad_address();
return current_time - min_last_report_time_ms;
}
// TODO: add histogram/percentile
void Coordinator::ComputeQuerySummary() {
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
// In this case, the query did not even get to start all fragment instances.
// Some of the state that is used below might be uninitialized. In this case,
// the query has made so little progress, reporting a summary is not very useful.
if (!has_called_wait_.Load()) return;
if (backend_states_.empty()) return;
// make sure fragment_stats_ are up-to-date
for (BackendState* backend_state: backend_states_) {
backend_state->UpdateExecStats(fragment_stats_);
}
for (FragmentStats* fragment_stats: fragment_stats_) {
fragment_stats->AddSplitStats();
// TODO: output the split info string and detailed stats to VLOG_FILE again?
fragment_stats->AddExecStats();
}
stringstream mem_info, cpu_user_info, cpu_system_info, bytes_read_info;
ResourceUtilization total_utilization;
for (BackendState* backend_state: backend_states_) {
ResourceUtilization utilization = backend_state->GetResourceUtilization();
total_utilization.Merge(utilization);
string network_address = NetworkAddressPBToString(backend_state->impalad_address());
mem_info << network_address << "("
<< PrettyPrinter::Print(utilization.peak_per_host_mem_consumption,
TUnit::BYTES) << ") ";
bytes_read_info << network_address << "("
<< PrettyPrinter::Print(utilization.bytes_read, TUnit::BYTES) << ") ";
cpu_user_info << network_address << "("
<< PrettyPrinter::Print(utilization.cpu_user_ns, TUnit::TIME_NS)
<< ") ";
cpu_system_info << network_address << "("
<< PrettyPrinter::Print(utilization.cpu_sys_ns, TUnit::TIME_NS)
<< ") ";
}
// The definitions of these counters are in the top of this file.
COUNTER_SET(PROFILE_TotalBytesRead.Instantiate(query_profile_),
total_utilization.bytes_read);
COUNTER_SET(PROFILE_TotalCpuTime.Instantiate(query_profile_),
total_utilization.cpu_user_ns + total_utilization.cpu_sys_ns);
COUNTER_SET(PROFILE_TotalBytesSent.Instantiate(query_profile_),
total_utilization.scan_bytes_sent + total_utilization.exchange_bytes_sent);
COUNTER_SET(PROFILE_TotalScanBytesSent.Instantiate(query_profile_),
total_utilization.scan_bytes_sent);
COUNTER_SET(PROFILE_TotalInnerBytesSent.Instantiate(query_profile_),
total_utilization.exchange_bytes_sent);
double xchg_scan_ratio = 0;
if (total_utilization.bytes_read > 0) {
xchg_scan_ratio =
(double)total_utilization.scan_bytes_sent / total_utilization.bytes_read;
}
COUNTER_SET(PROFILE_ExchangeScanRatio.Instantiate(query_profile_), xchg_scan_ratio);
double inner_node_ratio = 0;
if (total_utilization.scan_bytes_sent > 0) {
inner_node_ratio =
(double)total_utilization.exchange_bytes_sent / total_utilization.scan_bytes_sent;
}
COUNTER_SET(PROFILE_InnerNodeSelectivityRatio.Instantiate(query_profile_),
inner_node_ratio);
// TODO(IMPALA-8126): Move to host profiles
query_profile_->AddInfoString("Per Node Peak Memory Usage", mem_info.str());
query_profile_->AddInfoString("Per Node Bytes Read", bytes_read_info.str());
query_profile_->AddInfoString("Per Node User Time", cpu_user_info.str());
query_profile_->AddInfoString("Per Node System Time", cpu_system_info.str());
}
string Coordinator::GetErrorLog() {
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
ErrorLogMap merged;
{
lock_guard<SpinLock> l(backend_states_init_lock_);
for (BackendState* state: backend_states_) state->MergeErrorLog(&merged);
}
return PrintErrorMapToString(merged);
}
void Coordinator::ReleaseExecResources() {
lock_guard<shared_mutex> lock(filter_routing_table_->lock); // Exclusive lock.
if (filter_routing_table_->num_filters() > 0) {
query_profile_->AddInfoString("Final filter table", FilterDebugString());
}
for (auto& filter : filter_routing_table_->id_to_filter) {
unique_lock<SpinLock> l(filter.second.lock());
filter.second.WaitForPublishFilter();
filter.second.DisableAndRelease(
filter_mem_tracker_, filter.second.received_all_updates());
}
// This may be NULL while executing UDFs.
if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
// At this point some tracked memory may still be used in the coordinator for result
// caching. The query MemTracker will be cleaned up later.
}
void Coordinator::ReleaseQueryAdmissionControlResources() {
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
vector<BackendState*> unreleased_backends =
backend_resource_state_->CloseAndGetUnreleasedBackends();
if (!unreleased_backends.empty()) {
ReleaseBackendAdmissionControlResources(unreleased_backends);
backend_resource_state_->BackendsReleased(unreleased_backends);
for (int i = 0; i < unreleased_backends.size(); ++i) {
backend_released_barrier_.Notify();
}
}
// Wait for all backends to be released before calling
// AdmissionController::ReleaseQuery.
backend_released_barrier_.Wait();
LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
AdmissionController* admission_controller =
ExecEnv::GetInstance()->admission_controller();
DCHECK(admission_controller != nullptr);
admission_controller->ReleaseQuery(exec_params_.query_id(),
ComputeQueryResourceUtilization().peak_per_host_mem_consumption);
query_events_->MarkEvent("Released admission control resources");
}
void Coordinator::ReleaseBackendAdmissionControlResources(
const vector<BackendState*>& backend_states) {
AdmissionController* admission_controller =
ExecEnv::GetInstance()->admission_controller();
DCHECK(admission_controller != nullptr);
vector<NetworkAddressPB> host_addrs;
for (auto backend_state : backend_states) {
host_addrs.push_back(backend_state->impalad_address());
}
admission_controller->ReleaseQueryBackends(exec_params_.query_id(), host_addrs);
}
Coordinator::ResourceUtilization Coordinator::ComputeQueryResourceUtilization() {
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
ResourceUtilization query_resource_utilization;
for (BackendState* backend_state: backend_states_) {
query_resource_utilization.Merge(backend_state->GetResourceUtilization());
}
return query_resource_utilization;
}
vector<NetworkAddressPB> Coordinator::GetActiveBackends(
const vector<NetworkAddressPB>& candidates) {
// Build set from vector so that runtime of this function is O(backend_states.size()).
std::unordered_set<NetworkAddressPB> candidate_set(
candidates.begin(), candidates.end());
vector<NetworkAddressPB> result;
lock_guard<SpinLock> l(backend_states_init_lock_);
for (BackendState* backend_state : backend_states_) {
if (candidate_set.find(backend_state->impalad_address()) != candidate_set.end()
&& !backend_state->IsDone()) {
result.push_back(backend_state->impalad_address());
}
}
return result;
}
void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* context) {
VLOG(2) << "UpdateFilter(filter_id=" << params.filter_id() << ")";
shared_lock<shared_mutex> lock(filter_routing_table_->lock);
DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
<< "UpdateFilter() called although runtime filters are disabled";
DCHECK(backend_exec_complete_barrier_.get() != nullptr)
<< "Filters received before fragments started!";
WaitOnExecRpcs();
DCHECK(filter_routing_table_->is_complete)
<< "Filter received before routing table complete";
PublishFilterParamsPB rpc_params;
std::unordered_set<int> target_fragment_idxs;
if (!IsExecuting()) {
LOG(INFO) << "Filter update received for non-executing query with id: "
<< query_id();
return;
}
auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
if (it == filter_routing_table_.get()->id_to_filter.end()) {
// This should not be possible since 'id_to_filter' is never changed after
// InitFilterRoutingTable().
DCHECK(false);
LOG(INFO) << "Could not find filter with id: " << rpc_params.filter_id();
return;
}
FilterState* state = &it->second;
{
lock_guard<SpinLock> l(state->lock());
DCHECK(state->desc().has_remote_targets)
<< "Coordinator received filter that has only local targets";
// Check if the filter has already been sent, which could happen in five cases:
// * if one local filter had always_true set - no point waiting for other local
// filters that can't affect the aggregated global filter
// * if this is a broadcast join, and another local filter was already received
// * if the filter could not be allocated and so an always_true filter was sent
// immediately.
// * query execution finished and resources were released: filters do not need
// to be processed.
// * if the inbound sidecar for Bloom filter cannot be successfully retrieved.
if (state->disabled()) return;
if (filter_updates_received_->value() == 0) {
query_events_->MarkEvent("First dynamic filter received");
}
filter_updates_received_->Add(1);
state->ApplyUpdate(params, this, context);
if (state->pending_count() > 0 && state->enabled()) return;
// At this point, we either disabled this filter or aggregation is complete.
// No more updates are pending on this filter ID. Create a distribution payload and
// offer it to the queue.
for (const FilterTarget& target : *state->targets()) {
// Don't publish the filter to targets that are in the same fragment as the join
// that produced it.
if (target.is_local) continue;
target_fragment_idxs.insert(target.fragment_idx);
}
if (state->is_bloom_filter()) {
// Assign an outgoing bloom filter.
*rpc_params.mutable_bloom_filter() = state->bloom_filter();
DCHECK(rpc_params.bloom_filter().always_false()
|| rpc_params.bloom_filter().always_true()
|| !state->bloom_filter_directory().empty());
} else {
DCHECK(state->is_min_max_filter());
MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
}
// Filter is complete. We disable it so future UpdateFilter rpcs will be ignored,
// e.g., if it was a broadcast join. If filter is still enabled at this point, it
// means all filter updates have been successfully received and applied.
state->Disable(state->enabled());
TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
rpc_params.set_filter_id(params.filter_id());
// Called WaitForExecRpcs() so backend_states_ is valid.
for (BackendState* bs : backend_states_) {
if (!IsExecuting()) {
if (rpc_params.has_bloom_filter()) {
filter_mem_tracker_->Release(state->bloom_filter_directory().size());
state->bloom_filter_directory().clear();
state->bloom_filter_directory().shrink_to_fit();
return;
}
}
if (bs->HasFragmentIdx(target_fragment_idxs)) {
rpc_params.set_filter_id(params.filter_id());
RpcController* controller = obj_pool()->Add(new RpcController);
PublishFilterResultPB* res = obj_pool()->Add(new PublishFilterResultPB);
if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
&& !rpc_params.bloom_filter().always_true()) {
BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), controller,
state->bloom_filter_directory());
}
bs->PublishFilter(state, filter_mem_tracker_, rpc_params, *controller, *res);
}
}
}
}
void Coordinator::FilterState::ApplyUpdate(
const UpdateFilterParamsPB& params, Coordinator* coord, RpcContext* context) {
DCHECK(enabled());
DCHECK_GT(pending_count_, 0);
DCHECK_EQ(completion_time_, 0L);
if (first_arrival_time_ == 0L) {
first_arrival_time_ = coord->query_events_->ElapsedTime();
}
--pending_count_;
if (is_bloom_filter()) {
DCHECK(params.has_bloom_filter());
if (params.bloom_filter().always_true()) {
// An always_true filter is received. We don't need to wait for other pending
// backends.
DisableAndRelease(coord->filter_mem_tracker_, true);
} else if (params.bloom_filter().always_false()) {
if (!bloom_filter_.has_log_bufferpool_space()) {
bloom_filter_ = BloomFilterPB(params.bloom_filter());
}
} else {
// If the incoming Bloom filter is neither an always true filter nor an
// always false filter, then it must be the case that a non-empty sidecar slice
// has been received. Refer to BloomFilter::ToProtobuf() for further details.
DCHECK(params.bloom_filter().has_directory_sidecar_idx());
kudu::Slice sidecar_slice;
kudu::Status status = context->GetInboundSidecar(
params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
if (!status.ok()) {
LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
DisableAndRelease(coord->filter_mem_tracker_, false);
} else if (bloom_filter_.always_false()) {
int64_t heap_space = sidecar_slice.size();
if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
VLOG_QUERY << "Not enough memory to allocate filter: "
<< PrettyPrinter::Print(heap_space, TUnit::BYTES)
<< " (query_id=" << PrintId(coord->query_id()) << ")";
// Disable, as one missing update means a correct filter cannot be produced.
DisableAndRelease(coord->filter_mem_tracker_, false);
} else {
bloom_filter_ = params.bloom_filter();
bloom_filter_directory_ = sidecar_slice.ToString();
}
} else {
DCHECK_EQ(bloom_filter_directory_.size(), sidecar_slice.size());
BloomFilter::Or(params.bloom_filter(), sidecar_slice.data(), &bloom_filter_,
reinterpret_cast<uint8_t*>(const_cast<char*>(bloom_filter_directory_.data())),
sidecar_slice.size());
}
}
} else {
DCHECK(is_min_max_filter());
DCHECK(params.has_min_max_filter());
if (params.min_max_filter().always_true()) {
// An always_true filter is received. We don't need to wait for other pending
// backends.
DisableAndRelease(coord->filter_mem_tracker_, true);
} else if (min_max_filter_.always_false()) {
MinMaxFilter::Copy(params.min_max_filter(), &min_max_filter_);
} else {
MinMaxFilter::Or(params.min_max_filter(), &min_max_filter_,
ColumnType::FromThrift(desc_.src_expr.nodes[0].type));
}
}
if (pending_count_ == 0 || disabled()) {
completion_time_ = coord->query_events_->ElapsedTime();
}
}
void Coordinator::FilterState::DisableAndRelease(
MemTracker* tracker, const bool all_updates_received) {
Disable(all_updates_received);
if (is_bloom_filter()) {
tracker->Release(bloom_filter_directory_.size());
bloom_filter_directory_.clear();
bloom_filter_directory_.shrink_to_fit();
}
}
void Coordinator::FilterState::Disable(const bool all_updates_received) {
all_updates_received_ = all_updates_received;
if (is_bloom_filter()) {
bloom_filter_.set_always_true(true);
bloom_filter_.set_always_false(false);
} else {
DCHECK(is_min_max_filter());
min_max_filter_.set_always_true(true);
min_max_filter_.set_always_false(false);
}
}
void Coordinator::FilterState::WaitForPublishFilter() {
while (num_inflight_publish_filter_rpcs_ > 0) {
publish_filter_done_cv_.wait(lock_);
}
}
void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
lock_guard<SpinLock> l(exec_summary_.lock);
*exec_summary = exec_summary_.thrift_exec_summary;
}
MemTracker* Coordinator::query_mem_tracker() const {
return query_state_->query_mem_tracker();
}
void Coordinator::BackendsToJson(Document* doc) {
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
Value states(kArrayType);
{
lock_guard<SpinLock> l(backend_states_init_lock_);
for (BackendState* state : backend_states_) {
Value val(kObjectType);
state->ToJson(&val, doc);
states.PushBack(val, doc->GetAllocator());
}
}
doc->AddMember("backend_states", states, doc->GetAllocator());
}
void Coordinator::FInstanceStatsToJson(Document* doc) {
DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
Value states(kArrayType);
{
lock_guard<SpinLock> l(backend_states_init_lock_);
for (BackendState* state : backend_states_) {
Value val(kObjectType);
state->InstanceStatsToJson(&val, doc);
states.PushBack(val, doc->GetAllocator());
}
}
doc->AddMember("backend_instances", states, doc->GetAllocator());
}
const TQueryCtx& Coordinator::query_ctx() const {
return exec_params_.query_exec_request().query_ctx;
}
const TUniqueId& Coordinator::query_id() const {
return query_ctx().query_id;
}
const TFinalizeParams* Coordinator::finalize_params() const {
return exec_params_.query_exec_request().__isset.finalize_params ?
&exec_params_.query_exec_request().finalize_params :
nullptr;
}
bool Coordinator::IsExecuting() {
ExecState current_state = exec_state_.Load();
return current_state == ExecState::EXECUTING;
}