blob: cd3a741ea3f1526f9b66e1f9824d872025a3128e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/coordinator-backend-state.h"
#include <sstream>
#include <boost/thread/locks.hpp>
#include <boost/thread/lock_guard.hpp>
#include <boost/accumulators/accumulators.hpp>
#include "common/object-pool.h"
#include "exec/exec-node.h"
#include "exec/scan-node.h"
#include "scheduling/query-schedule.h"
#include "runtime/exec-env.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/debug-options.h"
#include "runtime/client-cache.h"
#include "runtime/client-cache-types.h"
#include "runtime/backend-client.h"
#include "runtime/coordinator-filter-state.h"
#include "util/error-util.h"
#include "util/uid-util.h"
#include "util/network-util.h"
#include "util/counting-barrier.h"
#include "util/progress-updater.h"
#include "gen-cpp/Types_types.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/ImpalaInternalService_constants.h"
#include "common/names.h"
using namespace impala;
namespace accumulators = boost::accumulators;
Coordinator::BackendState::BackendState(
const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode)
: query_id_(query_id),
state_idx_(state_idx),
filter_mode_(filter_mode),
rpc_latency_(0),
rpc_sent_(false),
peak_consumption_(0L) {
}
void Coordinator::BackendState::Init(
const vector<const FInstanceExecParams*>& instance_params_list,
const vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool) {
instance_params_list_ = instance_params_list;
host_ = instance_params_list_[0]->host;
num_remaining_instances_ = instance_params_list.size();
// populate instance_stats_map_ and install instance
// profiles as child profiles in fragment_stats' profile
int prev_fragment_idx = -1;
for (const FInstanceExecParams* instance_params: instance_params_list) {
DCHECK_EQ(host_, instance_params->host); // all hosts must be the same
int fragment_idx = instance_params->fragment().idx;
DCHECK_LT(fragment_idx, fragment_stats.size());
if (prev_fragment_idx != -1 && fragment_idx != prev_fragment_idx) {
// all instances of a fragment are contiguous
DCHECK_EQ(fragments_.count(fragment_idx), 0);
prev_fragment_idx = fragment_idx;
}
fragments_.insert(fragment_idx);
instance_stats_map_.emplace(
GetInstanceIdx(instance_params->instance_id),
obj_pool->Add(
new InstanceStats(*instance_params, fragment_stats[fragment_idx], obj_pool)));
}
}
void Coordinator::BackendState::SetRpcParams(
const DebugOptions& debug_options, const FilterRoutingTable& filter_routing_table,
TExecQueryFInstancesParams* rpc_params) {
rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
rpc_params->__set_coord_state_idx(state_idx_);
// set fragment_ctxs and fragment_instance_ctxs
rpc_params->fragment_instance_ctxs.resize(instance_params_list_.size());
for (int i = 0; i < instance_params_list_.size(); ++i) {
TPlanFragmentInstanceCtx& instance_ctx = rpc_params->fragment_instance_ctxs[i];
const FInstanceExecParams& params = *instance_params_list_[i];
int fragment_idx = params.fragment_exec_params.fragment.idx;
// add a TPlanFragmentCtx, if we don't already have it
if (rpc_params->fragment_ctxs.empty()
|| rpc_params->fragment_ctxs.back().fragment.idx != fragment_idx) {
rpc_params->fragment_ctxs.emplace_back();
TPlanFragmentCtx& fragment_ctx = rpc_params->fragment_ctxs.back();
fragment_ctx.__set_fragment(params.fragment_exec_params.fragment);
fragment_ctx.__set_destinations(params.fragment_exec_params.destinations);
}
instance_ctx.fragment_idx = fragment_idx;
instance_ctx.fragment_instance_id = params.instance_id;
instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
instance_ctx.__set_per_exch_num_senders(
params.fragment_exec_params.per_exch_num_senders);
instance_ctx.__set_sender_id(params.sender_id);
if (debug_options.node_id() != -1
&& (debug_options.instance_idx() == -1
|| debug_options.instance_idx() == GetInstanceIdx(params.instance_id))) {
instance_ctx.__set_debug_options(debug_options.ToThrift());
}
if (filter_mode_ == TRuntimeFilterMode::OFF) continue;
// Remove filters that weren't selected during filter routing table construction.
// TODO: do this more efficiently, we're looping over the entire plan for each
// instance separately
DCHECK_EQ(rpc_params->query_ctx.client_request.query_options.mt_dop, 0);
int instance_idx = GetInstanceIdx(params.instance_id);
for (TPlanNode& plan_node: rpc_params->fragment_ctxs.back().fragment.plan.nodes) {
if (!plan_node.__isset.hash_join_node) continue;
if (!plan_node.__isset.runtime_filters) continue;
vector<TRuntimeFilterDesc> required_filters;
for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) {
FilterRoutingTable::const_iterator filter_it =
filter_routing_table.find(desc.filter_id);
// filter was dropped in Coordinator::InitFilterRoutingTable()
if (filter_it == filter_routing_table.end()) continue;
const FilterState& f = filter_it->second;
if (f.src_fragment_instance_idxs().find(instance_idx)
== f.src_fragment_instance_idxs().end()) {
DCHECK(desc.is_broadcast_join);
continue;
}
// We don't need a target-side check here, because a filter is either sent to
// all its targets or none, and the none case is handled by checking if the
// filter is in the routing table.
required_filters.push_back(desc);
}
plan_node.__set_runtime_filters(required_filters);
}
}
}
void Coordinator::BackendState::Exec(
const TQueryCtx& query_ctx, const DebugOptions& debug_options,
const FilterRoutingTable& filter_routing_table,
CountingBarrier* exec_complete_barrier) {
NotifyBarrierOnExit notifier(exec_complete_barrier);
TExecQueryFInstancesParams rpc_params;
rpc_params.__set_query_ctx(query_ctx);
SetRpcParams(debug_options, filter_routing_table, &rpc_params);
VLOG_FILE << "making rpc: ExecQueryFInstances"
<< " host=" << impalad_address() << " query_id=" << PrintId(query_id_);
// guard against concurrent UpdateBackendExecStatus() that may arrive after RPC returns
lock_guard<mutex> l(lock_);
int64_t start = MonotonicMillis();
ImpalaBackendConnection backend_client(
ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &status_);
if (!status_.ok()) return;
TExecQueryFInstancesResult thrift_result;
Status rpc_status = backend_client.DoRpc(
&ImpalaBackendClient::ExecQueryFInstances, rpc_params, &thrift_result);
rpc_sent_ = true;
rpc_latency_ = MonotonicMillis() - start;
const string ERR_TEMPLATE =
"ExecQueryFInstances rpc query_id=$0 failed: $1";
if (!rpc_status.ok()) {
const string& err_msg =
Substitute(ERR_TEMPLATE, PrintId(query_id_), rpc_status.msg().msg());
VLOG_QUERY << err_msg;
status_ = Status(err_msg);
return;
}
Status exec_status = Status(thrift_result.status);
if (!exec_status.ok()) {
const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id_),
exec_status.msg().GetFullMessageDetails());
VLOG_QUERY << err_msg;
status_ = Status(err_msg);
return;
}
for (const auto& entry: instance_stats_map_) entry.second->stopwatch_.Start();
VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_);
}
Status Coordinator::BackendState::GetStatus(TUniqueId* failed_instance_id) {
lock_guard<mutex> l(lock_);
if (!status_.ok() && failed_instance_id != nullptr) {
*failed_instance_id = failed_instance_id_;
}
return status_;
}
int64_t Coordinator::BackendState::GetPeakConsumption() {
lock_guard<mutex> l(lock_);
return peak_consumption_;
}
void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) {
lock_guard<mutex> l(lock_);
if (error_log_.size() > 0) MergeErrorMaps(error_log_, merged);
}
bool Coordinator::BackendState::IsDone() {
lock_guard<mutex> l(lock_);
return IsDoneInternal();
}
inline bool Coordinator::BackendState::IsDoneInternal() const {
return num_remaining_instances_ == 0 || !status_.ok();
}
void Coordinator::BackendState::ApplyExecStatusReport(
const TReportExecStatusParams& backend_exec_status, ExecSummary* exec_summary,
ProgressUpdater* scan_range_progress, bool* done) {
lock_guard<SpinLock> l1(exec_summary->lock);
lock_guard<mutex> l2(lock_);
for (const TFragmentInstanceExecStatus& instance_exec_status:
backend_exec_status.instance_exec_status) {
Status instance_status(instance_exec_status.status);
if (instance_status.ok()) {
int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id);
DCHECK_EQ(instance_stats_map_.count(instance_idx), 1);
InstanceStats* instance_stats = instance_stats_map_[instance_idx];
DCHECK_EQ(instance_stats->exec_params_.instance_id,
instance_exec_status.fragment_instance_id);
instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
if (instance_stats->peak_mem_counter_ != nullptr) {
// protect against out-of-order status updates
peak_consumption_ =
max(peak_consumption_, instance_stats->peak_mem_counter_->value());
}
} else {
// if a query is aborted due to an error encountered by a single fragment instance,
// all other fragment instances will report a cancelled status; make sure not
// to mask the original error status
if (status_.ok() || status_.IsCancelled()) {
status_ = instance_status;
failed_instance_id_ = instance_exec_status.fragment_instance_id;
}
}
DCHECK_GT(num_remaining_instances_, 0);
if (instance_exec_status.done) --num_remaining_instances_;
// TODO: clean up the ReportQuerySummary() mess
if (status_.ok()) {
// We can't update this backend's profile if ReportQuerySummary() is running,
// because it depends on all profiles not changing during its execution (when it
// calls SortChildren()). ReportQuerySummary() only gets called after
// WaitForBackendCompletion() returns or at the end of CancelFragmentInstances().
// WaitForBackendCompletion() only returns after all backends have completed (in
// which case we wouldn't be in this function), or when there's an error, in which
// case CancelFragmentInstances() is called. CancelFragmentInstances sets all
// exec_state's statuses to cancelled.
// TODO: We're losing this profile information. Call ReportQuerySummary only after
// all backends have completed.
}
}
// Log messages aggregated by type
if (backend_exec_status.__isset.error_log && backend_exec_status.error_log.size() > 0) {
// Append the log messages from each update with the global state of the query
// execution
MergeErrorMaps(backend_exec_status.error_log, &error_log_);
VLOG_FILE << "host=" << host_ << " error log: " << PrintErrorMapToString(error_log_);
}
*done = IsDoneInternal();
// TODO: keep backend-wide stopwatch?
}
void Coordinator::BackendState::UpdateExecStats(
const vector<FragmentStats*>& fragment_stats) {
lock_guard<mutex> l(lock_);
for (const auto& entry: instance_stats_map_) {
const InstanceStats& instance_stats = *entry.second;
int fragment_idx = instance_stats.exec_params_.fragment().idx;
DCHECK_LT(fragment_idx, fragment_stats.size());
FragmentStats* f = fragment_stats[fragment_idx];
int64_t completion_time = instance_stats.stopwatch_.ElapsedTime();
f->completion_times_(completion_time);
if (completion_time > 0) {
f->rates_(instance_stats.total_split_size_
/ (completion_time / 1000.0 / 1000.0 / 1000.0));
}
f->avg_profile_->UpdateAverage(instance_stats.profile_);
}
}
bool Coordinator::BackendState::Cancel() {
lock_guard<mutex> l(lock_);
// Nothing to cancel if the exec rpc was not sent
if (!rpc_sent_) return false;
// don't cancel if it already finished (for any reason)
if (IsDoneInternal()) return false;
/// If the status is not OK, we still try to cancel - !OK status might mean
/// communication failure between backend and coordinator, but fragment
/// instances might still be running.
// set an error status to make sure we only cancel this once
if (status_.ok()) status_ = Status::CANCELLED;
Status status;
ImpalaBackendConnection backend_client(
ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &status);
if (!status.ok()) return false;
TCancelQueryFInstancesParams params;
params.protocol_version = ImpalaInternalServiceVersion::V1;
params.__set_query_id(query_id_);
TCancelQueryFInstancesResult dummy;
VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id="
<< query_id_ << " backend=" << impalad_address();
Status rpc_status;
// Try to send the RPC 3 times before failing.
bool retry_is_safe;
for (int i = 0; i < 3; ++i) {
rpc_status = backend_client.DoRpc(
&ImpalaBackendClient::CancelQueryFInstances, params, &dummy, &retry_is_safe);
if (rpc_status.ok() || !retry_is_safe) break;
}
if (!rpc_status.ok()) {
status_.MergeStatus(rpc_status);
stringstream msg;
msg << "CancelQueryFInstances rpc query_id=" << query_id_
<< " failed: " << rpc_status.msg().msg();
status_.AddDetail(msg.str());
return true;
}
return true;
}
void Coordinator::BackendState::PublishFilter(
shared_ptr<TPublishFilterParams> rpc_params) {
DCHECK_EQ(rpc_params->dst_query_id, query_id_);
if (fragments_.count(rpc_params->dst_fragment_idx) == 0) return;
Status status;
ImpalaBackendConnection backend_client(
ExecEnv::GetInstance()->impalad_client_cache(), host_, &status);
if (!status.ok()) return;
// Make a local copy of the shared 'master' set of parameters
TPublishFilterParams local_params(*rpc_params);
local_params.__set_bloom_filter(rpc_params->bloom_filter);
TPublishFilterResult res;
backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res);
// TODO: switch back to the following once we fix the lifecycle
// problems of Coordinator
//std::cref(fragment_inst->impalad_address()),
//std::cref(fragment_inst->fragment_instance_id())));
}
Coordinator::BackendState::InstanceStats::InstanceStats(
const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
ObjectPool* obj_pool)
: exec_params_(exec_params),
profile_(nullptr),
profile_created_(false),
total_split_size_(0),
total_ranges_complete_(0) {
const string& profile_name = Substitute("Instance $0 (host=$1)",
PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host));
profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
fragment_stats->root_profile()->AddChild(profile_);
// add total split size to fragment_stats->bytes_assigned()
for (const PerNodeScanRanges::value_type& entry: exec_params_.per_node_scan_ranges) {
for (const TScanRangeParams& scan_range_params: entry.second) {
if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue;
total_split_size_ += scan_range_params.scan_range.hdfs_file_split.length;
}
}
(*fragment_stats->bytes_assigned())(total_split_size_);
}
void Coordinator::BackendState::InstanceStats::InitCounters() {
vector<RuntimeProfile*> children;
profile_->GetAllChildren(&children);
for (RuntimeProfile* p: children) {
PlanNodeId id = ExecNode::GetNodeIdFromProfile(p);
// This profile is not for an exec node.
if (id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) continue;
RuntimeProfile::Counter* c =
p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER);
if (c != nullptr) scan_ranges_complete_counters_.push_back(c);
}
peak_mem_counter_ =
profile_->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER);
}
void Coordinator::BackendState::InstanceStats::Update(
const TFragmentInstanceExecStatus& exec_status,
ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) {
DCHECK(Status(exec_status.status).ok());
if (exec_status.done) stopwatch_.Stop();
profile_->Update(exec_status.profile);
if (!profile_created_) {
profile_created_ = true;
InitCounters();
}
profile_->ComputeTimeInProfile();
// update exec_summary
// TODO: why do this every time we get an updated instance profile?
vector<RuntimeProfile*> children;
profile_->GetAllChildren(&children);
TExecSummary& thrift_exec_summary = exec_summary->thrift_exec_summary;
for (RuntimeProfile* child: children) {
int node_id = ExecNode::GetNodeIdFromProfile(child);
if (node_id == -1) continue;
// TODO: create plan_node_id_to_summary_map_
TPlanNodeExecSummary& node_exec_summary =
thrift_exec_summary.nodes[exec_summary->node_id_to_idx_map[node_id]];
int per_fragment_instance_idx = exec_params_.per_fragment_instance_idx;
DCHECK_LT(per_fragment_instance_idx, node_exec_summary.exec_stats.size())
<< " node_id=" << node_id << " instance_id=" << PrintId(exec_params_.instance_id)
<< " fragment_idx=" << exec_params_.fragment().idx;
TExecStats& instance_stats =
node_exec_summary.exec_stats[per_fragment_instance_idx];
RuntimeProfile::Counter* rows_counter = child->GetCounter("RowsReturned");
RuntimeProfile::Counter* mem_counter = child->GetCounter("PeakMemoryUsage");
if (rows_counter != nullptr) instance_stats.__set_cardinality(rows_counter->value());
if (mem_counter != nullptr) instance_stats.__set_memory_used(mem_counter->value());
instance_stats.__set_latency_ns(child->local_time());
// TODO: track interesting per-node metrics
node_exec_summary.__isset.exec_stats = true;
}
// determine newly-completed scan ranges and update scan_range_progress
int64_t total = 0;
for (RuntimeProfile::Counter* c: scan_ranges_complete_counters_) total += c->value();
int64_t delta = total - total_ranges_complete_;
total_ranges_complete_ = total;
scan_range_progress->Update(delta);
}
Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name,
const string& root_profile_name, int num_instances, ObjectPool* obj_pool)
: avg_profile_(
obj_pool->Add(new RuntimeProfile(obj_pool, avg_profile_name, true))),
root_profile_(
obj_pool->Add(new RuntimeProfile(obj_pool, root_profile_name))),
num_instances_(num_instances) {
}
void Coordinator::FragmentStats::AddSplitStats() {
double min = accumulators::min(bytes_assigned_);
double max = accumulators::max(bytes_assigned_);
double mean = accumulators::mean(bytes_assigned_);
double stddev = sqrt(accumulators::variance(bytes_assigned_));
stringstream ss;
ss << " min: " << PrettyPrinter::Print(min, TUnit::BYTES)
<< ", max: " << PrettyPrinter::Print(max, TUnit::BYTES)
<< ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES)
<< ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES);
avg_profile_->AddInfoString("split sizes", ss.str());
}
// Comparator to order RuntimeProfiles by descending total time
typedef struct {
typedef pair<RuntimeProfile*, bool> Profile;
bool operator()(const Profile& a, const Profile& b) const {
// Reverse ordering: we want the longest first
return
a.first->total_time_counter()->value() > b.first->total_time_counter()->value();
}
} InstanceComparator;
void Coordinator::FragmentStats::AddExecStats() {
InstanceComparator comparator;
root_profile_->SortChildren(comparator);
stringstream times_label;
times_label
<< "min:" << PrettyPrinter::Print(
accumulators::min(completion_times_), TUnit::TIME_NS)
<< " max:" << PrettyPrinter::Print(
accumulators::max(completion_times_), TUnit::TIME_NS)
<< " mean: " << PrettyPrinter::Print(
accumulators::mean(completion_times_), TUnit::TIME_NS)
<< " stddev:" << PrettyPrinter::Print(
sqrt(accumulators::variance(completion_times_)), TUnit::TIME_NS);
stringstream rates_label;
rates_label
<< "min:" << PrettyPrinter::Print(
accumulators::min(rates_), TUnit::BYTES_PER_SECOND)
<< " max:" << PrettyPrinter::Print(
accumulators::max(rates_), TUnit::BYTES_PER_SECOND)
<< " mean:" << PrettyPrinter::Print(
accumulators::mean(rates_), TUnit::BYTES_PER_SECOND)
<< " stddev:" << PrettyPrinter::Print(
sqrt(accumulators::variance(rates_)), TUnit::BYTES_PER_SECOND);
// why plural?
avg_profile_->AddInfoString("completion times", times_label.str());
// why plural?
avg_profile_->AddInfoString("execution rates", rates_label.str());
avg_profile_->AddInfoString("num instances", lexical_cast<string>(num_instances_));
}