blob: fdf21d44f8ecc6ad27886df330e41ad7dcb99d17 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/coordinator-backend-state.h"
#include <boost/lexical_cast.hpp>
#include "common/object-pool.h"
#include "exec/exec-node.h"
#include "exec/kudu-util.h"
#include "exec/scan-node.h"
#include "gen-cpp/data_stream_service.proxy.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/util/monotime.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "rpc/rpc-mgr.inline.h"
#include "runtime/backend-client.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator-filter-state.h"
#include "runtime/debug-options.h"
#include "runtime/exec-env.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/krpc-data-stream-sender.h"
#include "runtime/mem-tracker.h"
#include "service/control-service.h"
#include "service/data-stream-service.h"
#include "util/counting-barrier.h"
#include "util/error-util-internal.h"
#include "util/network-util.h"
#include "util/pretty-printer.h"
#include "util/runtime-profile-counters.h"
#include "util/scope-exit-trigger.h"
#include "util/uid-util.h"
#include "common/names.h"
using kudu::MonoDelta;
using kudu::rpc::RpcController;
using kudu::rpc::RpcSidecar;
using namespace rapidjson;
namespace accumulators = boost::accumulators;
DECLARE_bool(gen_experimental_profile);
DECLARE_int32(backend_client_rpc_timeout_ms);
DECLARE_int64(rpc_max_message_size);
namespace impala {
PROFILE_DEFINE_COUNTER(BytesAssigned, STABLE_HIGH, TUnit::BYTES,
"Total number of bytes of filesystem scan ranges assigned to this fragment "
"instance.");
PROFILE_DEFINE_TIMER(
CompletionTime, STABLE_HIGH, "Completion time of this fragment instance");
PROFILE_DEFINE_COUNTER(ExecutionRate, STABLE_LOW, TUnit::BYTES_PER_SECOND,
"Rate at which the fragment instance processed its input scan ranges.");
const char* Coordinator::BackendState::InstanceStats::LAST_REPORT_TIME_DESC =
"Last report received time";
Coordinator::BackendState::BackendState(const QueryExecParams& exec_params, int state_idx,
TRuntimeFilterMode::type filter_mode, const BackendExecParamsPB& backend_exec_params)
: exec_params_(exec_params),
state_idx_(state_idx),
filter_mode_(filter_mode),
backend_exec_params_(backend_exec_params),
host_(backend_exec_params_.address()),
krpc_host_(backend_exec_params_.krpc_address()),
query_ctx_(exec_params_.query_exec_request().query_ctx),
query_id_(exec_params_.query_id()),
num_remaining_instances_(backend_exec_params_.instance_params().size()) {}
void Coordinator::BackendState::Init(const vector<FragmentStats*>& fragment_stats,
RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
host_profile_ = RuntimeProfile::Create(obj_pool, NetworkAddressPBToString(host_));
host_profile_parent->AddChild(host_profile_);
RuntimeProfile::Counter* admission_slots =
ADD_COUNTER(host_profile_, "AdmissionSlots", TUnit::UNIT);
admission_slots->Set(backend_exec_params_.slots_to_use());
// populate instance_stats_map_ and install instance
// profiles as child profiles in fragment_stats' profile
int prev_fragment_idx = -1;
for (const FInstanceExecParamsPB& instance_params :
backend_exec_params_.instance_params()) {
int fragment_idx = instance_params.fragment_idx();
DCHECK_LT(fragment_idx, fragment_stats.size());
if (prev_fragment_idx != -1 && fragment_idx != prev_fragment_idx) {
// all instances of a fragment are contiguous
DCHECK_EQ(fragments_.count(fragment_idx), 0);
prev_fragment_idx = fragment_idx;
}
fragments_.insert(fragment_idx);
const TPlanFragment* fragment = exec_params_.GetFragments()[fragment_idx];
InstanceStats* instance_stats = obj_pool->Add(new InstanceStats(
instance_params, fragment, host_, fragment_stats[fragment_idx], obj_pool));
instance_stats_map_.emplace(
GetInstanceIdx(instance_params.instance_id()), instance_stats);
}
}
void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
const FilterRoutingTable& filter_routing_table, ExecQueryFInstancesRequestPB* request,
TExecPlanFragmentInfo* fragment_info) {
request->set_coord_state_idx(state_idx_);
request->set_min_mem_reservation_bytes(
backend_exec_params_.min_mem_reservation_bytes());
request->set_initial_mem_reservation_total_claims(
backend_exec_params_.initial_mem_reservation_total_claims());
request->set_per_backend_mem_limit(
exec_params_.query_schedule().per_backend_mem_limit());
// set fragment_ctxs and fragment_instance_ctxs
fragment_info->__isset.fragments = true;
fragment_info->__isset.fragment_instance_ctxs = true;
fragment_info->fragment_instance_ctxs.resize(
backend_exec_params_.instance_params().size());
for (int i = 0; i < backend_exec_params_.instance_params().size(); ++i) {
TPlanFragmentInstanceCtx& instance_ctx = fragment_info->fragment_instance_ctxs[i];
PlanFragmentInstanceCtxPB* instance_ctx_pb = request->add_fragment_instance_ctxs();
const FInstanceExecParamsPB& params = backend_exec_params_.instance_params(i);
int fragment_idx = params.fragment_idx();
DCHECK_LT(fragment_idx, exec_params_.query_schedule().fragment_exec_params().size());
const FragmentExecParamsPB& fragment_exec_params =
exec_params_.query_schedule().fragment_exec_params(fragment_idx);
// add a TPlanFragment, if we don't already have it
if (fragment_info->fragments.empty()
|| fragment_info->fragments.back().idx != fragment_idx) {
const TPlanFragment* fragment = exec_params_.GetFragments()[fragment_idx];
fragment_info->fragments.push_back(*fragment);
PlanFragmentCtxPB* fragment_ctx = request->add_fragment_ctxs();
fragment_ctx->set_fragment_idx(fragment_idx);
*fragment_ctx->mutable_destinations() = fragment_exec_params.destinations();
}
instance_ctx.fragment_idx = fragment_idx;
instance_ctx_pb->set_fragment_idx(fragment_idx);
UniqueIdPBToTUniqueId(params.instance_id(), &instance_ctx.fragment_instance_id);
instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx();
*instance_ctx_pb->mutable_per_node_scan_ranges() = params.per_node_scan_ranges();
for (const auto& entry : fragment_exec_params.per_exch_num_senders()) {
instance_ctx.per_exch_num_senders[entry.first] = entry.second;
}
instance_ctx.__set_sender_id(params.sender_id());
*instance_ctx_pb->mutable_join_build_inputs() = params.join_build_inputs();
if (params.num_join_build_outputs() != -1) {
instance_ctx.__set_num_join_build_outputs(params.num_join_build_outputs());
}
if (debug_options.enabled()
&& (debug_options.instance_idx() == -1
|| debug_options.instance_idx() == GetInstanceIdx(params.instance_id()))) {
instance_ctx.__set_debug_options(debug_options.ToThrift());
}
int num_backends = fragment_exec_params.num_hosts();
instance_ctx.__set_num_backends(num_backends);
if (filter_mode_ == TRuntimeFilterMode::OFF) continue;
int instance_idx = GetInstanceIdx(params.instance_id());
auto& produced_map = filter_routing_table.finstance_filters_produced;
auto produced_it = produced_map.find(instance_idx);
if (produced_it == produced_map.end()) continue;
// Finstance needs list of source filters that were selected during filter routing
// table construction.
instance_ctx.__set_filters_produced(produced_it->second);
}
}
void Coordinator::BackendState::SetExecError(
const Status& status, TypedCountingBarrier<Status>* exec_status_barrier) {
const string ERR_TEMPLATE = "ExecQueryFInstances rpc query_id=$0 failed: $1";
const string& err_msg =
Substitute(ERR_TEMPLATE, PrintId(query_id_), status.msg().GetFullMessageDetails());
LOG(ERROR) << err_msg;
status_ = Status::Expected(err_msg);
exec_done_ = true;
exec_status_barrier->NotifyRemaining(status);
}
void Coordinator::BackendState::WaitOnExecRpc() {
unique_lock<mutex> l(lock_);
WaitOnExecLocked(&l);
}
void Coordinator::BackendState::WaitOnExecLocked(unique_lock<mutex>* l) {
DCHECK(l->owns_lock());
while (!exec_done_) {
exec_done_cv_.Wait(*l);
}
}
void Coordinator::BackendState::ExecCompleteCb(
TypedCountingBarrier<Status>* exec_status_barrier, int64_t start_ms) {
{
lock_guard<mutex> l(lock_);
exec_rpc_status_ = exec_rpc_controller_.status();
rpc_latency_ = MonotonicMillis() - start_ms;
if (!exec_rpc_status_.ok()) {
SetExecError(
FromKuduStatus(exec_rpc_status_, "Exec() rpc failed"), exec_status_barrier);
goto done;
}
Status exec_status = Status(exec_response_.status());
if (!exec_status.ok()) {
SetExecError(exec_status, exec_status_barrier);
goto done;
}
for (const auto& entry : instance_stats_map_) entry.second->stopwatch_.Start();
VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_);
exec_done_ = true;
last_report_time_ms_ = GenerateReportTimestamp();
exec_status_barrier->Notify(Status::OK());
}
done:
// Notify after releasing 'lock_' so that we don't wake up a thread just to have it
// immediately block again.
exec_done_cv_.NotifyAll();
}
void Coordinator::BackendState::ExecAsync(const DebugOptions& debug_options,
const FilterRoutingTable& filter_routing_table,
const kudu::Slice& serialized_query_ctx,
TypedCountingBarrier<Status>* exec_status_barrier) {
{
lock_guard<mutex> l(lock_);
DCHECK(!exec_done_);
DCHECK(status_.ok());
// Do not issue an ExecQueryFInstances RPC if there are no fragment instances
// scheduled to run on this backend.
if (IsEmptyBackend()) {
DCHECK(backend_exec_params_.is_coord_backend());
exec_done_ = true;
exec_status_barrier->Notify(Status::OK());
goto done;
}
std::unique_ptr<ControlServiceProxy> proxy;
Status get_proxy_status = ControlService::GetProxy(
FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy);
if (!get_proxy_status.ok()) {
SetExecError(get_proxy_status, exec_status_barrier);
goto done;
}
ExecQueryFInstancesRequestPB request;
TExecPlanFragmentInfo fragment_info;
SetRpcParams(debug_options, filter_routing_table, &request, &fragment_info);
exec_rpc_controller_.set_timeout(
MonoDelta::FromMilliseconds(FLAGS_backend_client_rpc_timeout_ms));
// Serialize the sidecar and add it to the rpc controller. The serialized buffer is
// owned by 'serializer' and is freed when it is destructed.
ThriftSerializer serializer(true);
uint8_t* serialized_buf = nullptr;
uint32_t serialized_len = 0;
Status serialize_status =
DebugAction(exec_params_.query_options(), "EXEC_SERIALIZE_FRAGMENT_INFO");
if (LIKELY(serialize_status.ok())) {
serialize_status =
serializer.SerializeToBuffer(&fragment_info, &serialized_len, &serialized_buf);
}
if (UNLIKELY(!serialize_status.ok())) {
SetExecError(serialize_status, exec_status_barrier);
goto done;
} else if (serialized_len > FLAGS_rpc_max_message_size) {
SetExecError(
Status::Expected("Serialized Exec() request exceeds --rpc_max_message_size."),
exec_status_barrier);
goto done;
}
// TODO: eliminate the extra copy here by using a Slice
unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
sidecar_buf->assign_copy(serialized_buf, serialized_len);
unique_ptr<RpcSidecar> rpc_sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
int sidecar_idx;
kudu::Status sidecar_status =
exec_rpc_controller_.AddOutboundSidecar(move(rpc_sidecar), &sidecar_idx);
if (!sidecar_status.ok()) {
SetExecError(
FromKuduStatus(sidecar_status, "Failed to add sidecar"), exec_status_barrier);
goto done;
}
request.set_plan_fragment_info_sidecar_idx(sidecar_idx);
// Add the serialized TQueryCtx as a sidecar.
unique_ptr<RpcSidecar> query_ctx_sidecar =
RpcSidecar::FromSlice(serialized_query_ctx);
int query_ctx_sidecar_idx;
kudu::Status query_ctx_sidecar_status = exec_rpc_controller_.AddOutboundSidecar(
move(query_ctx_sidecar), &query_ctx_sidecar_idx);
if (!query_ctx_sidecar_status.ok()) {
SetExecError(
FromKuduStatus(query_ctx_sidecar_status, "Failed to add TQueryCtx sidecar"),
exec_status_barrier);
goto done;
}
request.set_query_ctx_sidecar_idx(query_ctx_sidecar_idx);
VLOG_FILE << "making rpc: ExecQueryFInstances"
<< " host=" << impalad_address() << " query_id=" << PrintId(query_id_);
proxy->ExecQueryFInstancesAsync(request, &exec_response_, &exec_rpc_controller_,
std::bind(&Coordinator::BackendState::ExecCompleteCb, this, exec_status_barrier,
MonotonicMillis()));
exec_rpc_sent_ = true;
return;
}
done:
// Notify after releasing 'lock_' so that we don't wake up a thread just to have it
// immediately block again.
exec_done_cv_.NotifyAll();
}
Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
TUniqueId* failed_instance_id) {
lock_guard<mutex> l(lock_);
DCHECK_EQ(is_fragment_failure == nullptr, failed_instance_id == nullptr);
if (!status_.ok() && failed_instance_id != nullptr) {
*is_fragment_failure = is_fragment_failure_;
*failed_instance_id = failed_instance_id_;
}
return status_;
}
Coordinator::ResourceUtilization Coordinator::BackendState::GetResourceUtilization() {
lock_guard<mutex> l(lock_);
DCHECK(exec_done_) << "May only be called after WaitOnExecRpc() completes.";
return GetResourceUtilizationLocked();
}
Coordinator::ResourceUtilization
Coordinator::BackendState::GetResourceUtilizationLocked() {
return backend_utilization_;
}
void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) {
lock_guard<mutex> l(lock_);
DCHECK(exec_done_) << "May only be called after WaitOnExecRpc() completes.";
if (error_log_.size() > 0) MergeErrorMaps(error_log_, merged);
}
bool Coordinator::BackendState::HasFragmentIdx(int fragment_idx) const {
return fragments_.count(fragment_idx) > 0;
}
bool Coordinator::BackendState::HasFragmentIdx(
const std::unordered_set<int>& fragment_idxs) const {
for (int fragment_idx : fragment_idxs) {
if (HasFragmentIdx(fragment_idx)) return true;
}
return false;
}
void Coordinator::BackendState::LogFirstInProgress(
std::vector<Coordinator::BackendState*> backend_states) {
for (Coordinator::BackendState* backend_state : backend_states) {
if (!backend_state->IsDone()) {
VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id_)
<< ": first in-progress backend: " << backend_state->impalad_address();
break;
}
}
}
bool Coordinator::BackendState::IsDone() {
unique_lock<mutex> lock(lock_);
return IsDoneLocked(lock);
}
inline bool Coordinator::BackendState::IsDoneLocked(
const unique_lock<std::mutex>& lock) const {
DCHECK(lock.owns_lock() && lock.mutex() == &lock_);
return num_remaining_instances_ == 0 || !status_.ok();
}
bool Coordinator::BackendState::ApplyExecStatusReport(
const ReportExecStatusRequestPB& backend_exec_status,
const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state,
vector<AuxErrorInfoPB>* aux_error_info) {
DCHECK(!IsEmptyBackend());
// Hold the exec_summary's lock to avoid exposing it half-way through
// the update loop below.
lock_guard<SpinLock> l1(exec_summary->lock);
unique_lock<mutex> lock(lock_);
last_report_time_ms_ = GenerateReportTimestamp();
// If this backend completed previously, don't apply the update. This ensures that
// the profile doesn't change during or after Coordinator::ComputeQuerySummary(), but
// can mean that we lose profile information for failed queries, since the Coordinator
// will call Cancel() on all BackendStates and we may stop accepting reports before some
// backends send their final report.
// TODO: revisit ComputeQuerySummary()
if (IsDoneLocked(lock)) return false;
// Use empty profile in case profile serialization/deserialization failed.
// 'thrift_profiles' and 'instance_exec_status' vectors have one-to-one correspondance.
vector<TRuntimeProfileTree> empty_profiles;
vector<TRuntimeProfileTree>::const_iterator profile_iter;
if (UNLIKELY(thrift_profiles.profile_trees.size() == 0)) {
empty_profiles.resize(backend_exec_status.instance_exec_status().size());
profile_iter = empty_profiles.begin();
} else {
DCHECK_EQ(thrift_profiles.profile_trees.size(),
backend_exec_status.instance_exec_status().size());
profile_iter = thrift_profiles.profile_trees.begin();
}
for (auto status_iter = backend_exec_status.instance_exec_status().begin();
status_iter != backend_exec_status.instance_exec_status().end();
++status_iter, ++profile_iter) {
const FragmentInstanceExecStatusPB& instance_exec_status = *status_iter;
int64_t report_seq_no = instance_exec_status.report_seq_no();
int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id());
DCHECK_EQ(instance_stats_map_.count(instance_idx), 1);
InstanceStats* instance_stats = instance_stats_map_[instance_idx];
int64_t last_report_seq_no = instance_stats->last_report_seq_no_;
DCHECK_EQ(instance_stats->exec_params_.instance_id(),
instance_exec_status.fragment_instance_id());
// Ignore duplicate or out-of-order messages.
if (report_seq_no <= last_report_seq_no) {
VLOG_QUERY << "Ignoring stale update for query instance "
<< instance_stats->exec_params_.instance_id() << " with seq no "
<< report_seq_no;
continue;
}
DCHECK(!instance_stats->done_);
instance_stats->Update(instance_exec_status, *profile_iter, exec_summary);
// Update DML stats
if (instance_exec_status.has_dml_exec_status()) {
dml_exec_state->Update(instance_exec_status.dml_exec_status());
}
// Handle the non-idempotent parts of the report for any sequence numbers that we
// haven't seen yet.
if (instance_exec_status.stateful_report_size() > 0) {
for (const auto& stateful_report : instance_exec_status.stateful_report()) {
DCHECK_LE(stateful_report.report_seq_no(), report_seq_no);
if (last_report_seq_no < stateful_report.report_seq_no()) {
// Append the log messages from each update with the global state of the query
// execution
MergeErrorMaps(stateful_report.error_log(), &error_log_);
VLOG_FILE << "host=" << host_
<< " error log: " << PrintErrorMapToString(error_log_);
if (stateful_report.has_aux_error_info()) {
aux_error_info->push_back(stateful_report.aux_error_info());
}
}
}
}
DCHECK_GT(num_remaining_instances_, 0);
if (instance_exec_status.done()) {
DCHECK(!instance_stats->done_);
instance_stats->done_ = true;
--num_remaining_instances_;
}
}
// Determine newly-completed scan ranges and update scan_range_progress.
int64_t scan_ranges_complete = backend_exec_status.scan_ranges_complete();
int64_t scan_range_delta = scan_ranges_complete - total_ranges_complete_;
DCHECK_GE(scan_range_delta, 0);
scan_range_progress->Update(scan_range_delta);
total_ranges_complete_ = scan_ranges_complete;
backend_utilization_.peak_per_host_mem_consumption =
backend_exec_status.peak_mem_consumption();
backend_utilization_.cpu_user_ns = backend_exec_status.cpu_user_ns();
backend_utilization_.cpu_sys_ns = backend_exec_status.cpu_sys_ns();
backend_utilization_.bytes_read = backend_exec_status.bytes_read();
backend_utilization_.exchange_bytes_sent = backend_exec_status.exchange_bytes_sent();
backend_utilization_.scan_bytes_sent = backend_exec_status.scan_bytes_sent();
// status_ has incorporated the status from all fragment instances. If the overall
// backend status is not OK, but no specific fragment instance reported an error, then
// this is a general backend error. Incorporate the general error into status_.
Status overall_status(backend_exec_status.overall_status());
if (!overall_status.ok() && (status_.ok() || status_.IsCancelled())) {
status_ = overall_status;
if (backend_exec_status.has_fragment_instance_id()) {
failed_instance_id_ = ProtoToQueryId(backend_exec_status.fragment_instance_id());
is_fragment_failure_ = true;
}
}
// TODO: keep backend-wide stopwatch?
return IsDoneLocked(lock);
}
void Coordinator::BackendState::UpdateHostProfile(
const TRuntimeProfileTree& thrift_profile) {
// We do not take 'lock_' here because RuntimeProfile::Update() is thread-safe.
DCHECK(!IsEmptyBackend());
host_profile_->Update(thrift_profile);
}
void Coordinator::BackendState::UpdateExecStats(
const vector<FragmentStats*>& fragment_stats) {
lock_guard<mutex> l(lock_);
DCHECK(exec_done_) << "May only be called after WaitOnExecRpc() completes.";
for (const auto& entry: instance_stats_map_) {
const InstanceStats& instance_stats = *entry.second;
int fragment_idx = instance_stats.exec_params_.fragment_idx();
DCHECK_LT(fragment_idx, fragment_stats.size());
FragmentStats* f = fragment_stats[fragment_idx];
int64_t completion_time = instance_stats.stopwatch_.ElapsedTime();
RuntimeProfile::Counter* completion_timer =
PROFILE_CompletionTime.Instantiate(instance_stats.profile_);
completion_timer->Set(completion_time);
if (!FLAGS_gen_experimental_profile) f->completion_times_(completion_time);
if (completion_time > 0) {
RuntimeProfile::Counter* execution_rate_counter =
PROFILE_ExecutionRate.Instantiate(instance_stats.profile_);
double rate =
instance_stats.total_split_size_ / (completion_time / 1000.0 / 1000.0 / 1000.0);
execution_rate_counter->Set(static_cast<int64_t>(rate));
if (!FLAGS_gen_experimental_profile) f->rates_(rate);
}
f->agg_profile_->Update(
instance_stats.profile_, instance_stats.per_fragment_instance_idx());
}
}
Coordinator::BackendState::CancelResult Coordinator::BackendState::Cancel(
bool fire_and_forget) {
// Update 'result' based on the actions we take in this function and/or errors we hit.
CancelResult result;
unique_lock<mutex> l(lock_);
// Nothing to cancel if the exec rpc was not sent.
if (!exec_rpc_sent_) {
if (status_.ok()) {
status_ = Status::CANCELLED;
result.became_done = true;
}
VLogForBackend("Not sending Cancel() rpc because nothing was started.");
exec_done_ = true;
// Notify after releasing 'lock_' so that we don't wake up a thread just to have it
// immediately block again.
l.unlock();
exec_done_cv_.NotifyAll();
return result;
}
// If the exec rpc was sent but the callback hasn't been executed, try to cancel the rpc
// and then wait for it to be done.
if (!exec_done_) {
VLogForBackend("Attempting to cancel Exec() rpc");
exec_rpc_controller_.Cancel();
WaitOnExecLocked(&l);
}
// Don't cancel if we're done or already sent an RPC. Note that its possible the
// backend is still running, eg. if the rpc layer reported that the Exec() rpc failed
// but it actually reached the backend. In that case, the backend will cancel itself
// the first time it tries to send a status report and the coordinator responds with
// an error.
if (IsDoneLocked(l)) {
VLogForBackend(Substitute(
"Not cancelling because the backend is already done: $0", status_.GetDetail()));
return result;
} else if (sent_cancel_rpc_) {
DCHECK(status_.ok());
// If we did a fire_and_forget=false followed by fire_and_forget=true.
if (fire_and_forget) {
status_ = Status::CANCELLED;
result.became_done = true;
}
VLogForBackend(Substitute(
"Not cancelling because cancel RPC already sent: $0", status_.GetDetail()));
return result;
}
// Avoid sending redundant cancel RPCs.
sent_cancel_rpc_ = true;
result.cancel_attempted = true;
// Set the status to CANCELLED if we are firing and forgetting.
if (fire_and_forget && status_.ok()) {
result.became_done = true;
status_ = Status::CANCELLED;
}
VLogForBackend("Sending CancelQueryFInstances rpc");
std::unique_ptr<ControlServiceProxy> proxy;
Status get_proxy_status = ControlService::GetProxy(
FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy);
if (!get_proxy_status.ok()) {
status_.MergeStatus(get_proxy_status);
result.became_done = true;
VLogForBackend(Substitute("Could not get proxy: $0", get_proxy_status.msg().msg()));
return result;
}
CancelQueryFInstancesRequestPB request;
*request.mutable_query_id() = query_id_;
CancelQueryFInstancesResponsePB response;
const int num_retries = 3;
const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
Status rpc_status =
RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::CancelQueryFInstances, request,
&response, query_ctx_, "Cancel() RPC failed", num_retries, timeout_ms,
backoff_time_ms, "COORD_CANCEL_QUERY_FINSTANCES_RPC");
if (!rpc_status.ok()) {
status_.MergeStatus(rpc_status);
result.became_done = true;
VLogForBackend(
Substitute("CancelQueryFInstances rpc failed: $0", rpc_status.msg().msg()));
return result;
}
Status cancel_status = Status(response.status());
if (!cancel_status.ok()) {
status_.MergeStatus(cancel_status);
result.became_done = true;
VLogForBackend(
Substitute("CancelQueryFInstances failed: $0", cancel_status.msg().msg()));
return result;
}
return result;
}
void Coordinator::BackendState::PublishFilter(FilterState* state,
MemTracker* mem_tracker, const PublishFilterParamsPB& rpc_params,
RpcController& controller, PublishFilterResultPB& res) {
DCHECK_EQ(rpc_params.dst_query_id(), query_id_);
// If the backend is already done, it's not waiting for this filter, so we skip
// sending it in this case.
if (IsDone()) return;
VLOG(2) << "PublishFilter filter_id=" << rpc_params.filter_id() << " backend=" << host_;
Status status;
std::unique_ptr<DataStreamServiceProxy> proxy;
Status get_proxy_status = DataStreamService::GetProxy(
FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy);
if (!get_proxy_status.ok()) {
// Failing to send a filter is not a query-wide error - the remote fragment will
// continue regardless.
LOG(ERROR) << "Couldn't get proxy: " << get_proxy_status.msg().msg();
return;
}
state->IncrementNumInflightRpcs(1);
proxy->PublishFilterAsync(rpc_params, &res, &controller,
boost::bind(&Coordinator::BackendState::PublishFilterCompleteCb, this, &controller,
state, mem_tracker));
}
void Coordinator::BackendState::PublishFilterCompleteCb(
const kudu::rpc::RpcController* rpc_controller, FilterState* state,
MemTracker* mem_tracker) {
const kudu::Status controller_status = rpc_controller->status();
// In the case of an unsuccessful KRPC call, we only log this event w/o retrying.
// Failing to send a filter is not a query-wide error - the remote fragment will
// continue regardless.
if (!controller_status.ok()) {
LOG(ERROR) << "PublishFilter() failed: " << controller_status.message().ToString();
}
{
lock_guard<SpinLock> l(state->lock());
state->IncrementNumInflightRpcs(-1);
if (state->num_inflight_rpcs() == 0) {
// Since we disabled the filter once complete and held FilterState::lock_ while
// issuing all PublishFilter() rpcs, at this point there can't be any more
// PublishFilter() rpcs issued.
DCHECK(state->disabled());
if (state->is_bloom_filter() && state->bloom_filter_directory().size() > 0) {
mem_tracker->Release(state->bloom_filter_directory().size());
state->bloom_filter_directory().clear();
state->bloom_filter_directory().shrink_to_fit();
}
state->get_publish_filter_done_cv().notify_one();
}
}
}
Coordinator::BackendState::InstanceStats::InstanceStats(
const FInstanceExecParamsPB& exec_params, const TPlanFragment* fragment,
const NetworkAddressPB& address, FragmentStats* fragment_stats, ObjectPool* obj_pool)
: exec_params_(exec_params), fragment_(fragment), profile_(nullptr) {
const string& profile_name = Substitute("Instance $0 (host=$1)",
PrintId(exec_params.instance_id()), NetworkAddressPBToString(address));
profile_ = RuntimeProfile::Create(obj_pool, profile_name);
profile_->AddInfoString(LAST_REPORT_TIME_DESC, ToStringFromUnixMillis(UnixMillis()));
fragment_stats->root_profile()->AddChild(profile_);
// Compute total split size and add to profile as "BytesAssigned".
for (const auto& entry : exec_params_.per_node_scan_ranges()) {
for (const ScanRangeParamsPB& scan_range_params : entry.second.scan_ranges()) {
if (!scan_range_params.scan_range().has_hdfs_file_split()) continue;
total_split_size_ += scan_range_params.scan_range().hdfs_file_split().length();
}
}
RuntimeProfile::Counter* bytes_assigned_counter =
PROFILE_BytesAssigned.Instantiate(profile_);
bytes_assigned_counter->Set(total_split_size_);
if (!FLAGS_gen_experimental_profile) {
(*fragment_stats->bytes_assigned())(total_split_size_);
}
}
void Coordinator::BackendState::InstanceStats::Update(
const FragmentInstanceExecStatusPB& exec_status,
const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary) {
last_report_time_ms_ = UnixMillis();
DCHECK_GT(exec_status.report_seq_no(), last_report_seq_no_);
last_report_seq_no_ = exec_status.report_seq_no();
if (exec_status.done()) stopwatch_.Stop();
profile_->UpdateInfoString(LAST_REPORT_TIME_DESC,
ToStringFromUnixMillis(last_report_time_ms_));
profile_->Update(thrift_profile);
profile_->ComputeTimeInProfile();
// update exec_summary
TExecSummary& thrift_exec_summary = exec_summary->thrift_exec_summary;
for (const ExecSummaryDataPB& exec_summary_entry : exec_status.exec_summary_data()) {
bool is_plan_node = exec_summary_entry.has_plan_node_id();
bool is_data_sink = exec_summary_entry.has_data_sink_id();
DCHECK(is_plan_node || is_data_sink) << "Invalid exec summary entry sent by executor";
int exec_summary_idx;
if (is_plan_node) {
exec_summary_idx =
exec_summary->node_id_to_idx_map[exec_summary_entry.plan_node_id()];
} else {
exec_summary_idx =
exec_summary->data_sink_id_to_idx_map[exec_summary_entry.data_sink_id()];
}
TPlanNodeExecSummary& node_exec_summary = thrift_exec_summary.nodes[exec_summary_idx];
DCHECK_EQ(node_exec_summary.fragment_idx, exec_params_.fragment_idx());
int per_fragment_instance_idx = exec_params_.per_fragment_instance_idx();
DCHECK_LT(per_fragment_instance_idx, node_exec_summary.exec_stats.size())
<< " instance_id=" << PrintId(exec_params_.instance_id())
<< " fragment_idx=" << exec_params_.fragment_idx();
TExecStats& instance_stats = node_exec_summary.exec_stats[per_fragment_instance_idx];
if (exec_summary_entry.has_rows_returned()) {
instance_stats.__set_cardinality(exec_summary_entry.rows_returned());
}
if (exec_summary_entry.has_peak_mem_usage()) {
instance_stats.__set_memory_used(exec_summary_entry.peak_mem_usage());
}
DCHECK(exec_summary_entry.has_local_time_ns());
instance_stats.__set_latency_ns(exec_summary_entry.local_time_ns());
node_exec_summary.__isset.exec_stats = true;
}
// extract the current execution state of this instance
current_state_ = exec_status.current_state();
}
void Coordinator::BackendState::InstanceStats::ToJson(Value* value, Document* document) {
Value instance_id_val(
PrintId(exec_params_.instance_id()).c_str(), document->GetAllocator());
value->AddMember("instance_id", instance_id_val, document->GetAllocator());
// We send 'done' explicitly so we don't have to infer it by comparison with a string
// constant in the debug page JS code.
value->AddMember("done", done_, document->GetAllocator());
Value state_val(FragmentInstanceState::ExecStateToString(current_state_).c_str(),
document->GetAllocator());
value->AddMember("current_state", state_val, document->GetAllocator());
Value fragment_name_val(fragment_->display_name.c_str(), document->GetAllocator());
value->AddMember("fragment_name", fragment_name_val, document->GetAllocator());
value->AddMember("first_status_update_received", last_report_time_ms_ > 0,
document->GetAllocator());
int64_t elapsed_time_ms =
std::max(static_cast<int64_t>(0), UnixMillis() - last_report_time_ms_);
value->AddMember("time_since_last_heard_from", elapsed_time_ms,
document->GetAllocator());
}
Coordinator::FragmentStats::FragmentStats(const string& agg_profile_name,
const string& root_profile_name, int num_instances, ObjectPool* obj_pool)
: agg_profile_(
AggregatedRuntimeProfile::Create(obj_pool, agg_profile_name, num_instances)),
root_profile_(RuntimeProfile::Create(obj_pool, root_profile_name)),
num_instances_(num_instances) {}
void Coordinator::FragmentStats::AddSplitStats() {
// These strings are not included in the transposed profile because we have counters.
if (FLAGS_gen_experimental_profile) return;
double min = accumulators::min(bytes_assigned_);
double max = accumulators::max(bytes_assigned_);
double mean = accumulators::mean(bytes_assigned_);
double stddev = sqrt(accumulators::variance(bytes_assigned_));
stringstream ss;
ss << " min: " << PrettyPrinter::Print(min, TUnit::BYTES)
<< ", max: " << PrettyPrinter::Print(max, TUnit::BYTES)
<< ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES)
<< ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES);
agg_profile_->AddInfoString("split sizes", ss.str());
}
void Coordinator::FragmentStats::AddExecStats() {
root_profile_->SortChildrenByTotalTime();
// These strings are not included in the transposed profile because we have counters.
if (FLAGS_gen_experimental_profile) return;
stringstream times_label;
times_label
<< "min:" << PrettyPrinter::Print(
accumulators::min(completion_times_), TUnit::TIME_NS)
<< " max:" << PrettyPrinter::Print(
accumulators::max(completion_times_), TUnit::TIME_NS)
<< " mean: " << PrettyPrinter::Print(
accumulators::mean(completion_times_), TUnit::TIME_NS)
<< " stddev:" << PrettyPrinter::Print(
sqrt(accumulators::variance(completion_times_)), TUnit::TIME_NS);
stringstream rates_label;
rates_label
<< "min:" << PrettyPrinter::Print(
accumulators::min(rates_), TUnit::BYTES_PER_SECOND)
<< " max:" << PrettyPrinter::Print(
accumulators::max(rates_), TUnit::BYTES_PER_SECOND)
<< " mean:" << PrettyPrinter::Print(
accumulators::mean(rates_), TUnit::BYTES_PER_SECOND)
<< " stddev:" << PrettyPrinter::Print(
sqrt(accumulators::variance(rates_)), TUnit::BYTES_PER_SECOND);
agg_profile_->AddInfoString("completion times", times_label.str());
agg_profile_->AddInfoString("execution rates", rates_label.str());
agg_profile_->AddInfoString("num instances", lexical_cast<string>(num_instances_));
}
void Coordinator::BackendState::ToJson(Value* value, Document* document) {
unique_lock<mutex> l(lock_);
DCHECK(exec_done_) << "May only be called after WaitOnExecRpc() completes.";
ResourceUtilization resource_utilization = GetResourceUtilizationLocked();
value->AddMember("num_instances", fragments_.size(), document->GetAllocator());
value->AddMember("done", IsDoneLocked(l), document->GetAllocator());
value->AddMember("peak_per_host_mem_consumption",
resource_utilization.peak_per_host_mem_consumption, document->GetAllocator());
value->AddMember("bytes_read", resource_utilization.bytes_read,
document->GetAllocator());
value->AddMember("cpu_user_s", resource_utilization.cpu_user_ns / 1e9,
document->GetAllocator());
value->AddMember("cpu_sys_s", resource_utilization.cpu_sys_ns / 1e9,
document->GetAllocator());
string host = NetworkAddressPBToString(impalad_address());
Value val(host.c_str(), document->GetAllocator());
value->AddMember("host", val, document->GetAllocator());
value->AddMember("rpc_latency", rpc_latency(), document->GetAllocator());
value->AddMember("time_since_last_heard_from", MonotonicMillis() - last_report_time_ms_,
document->GetAllocator());
string status_str = status_.ok() ? "OK" : status_.GetDetail();
Value status_val(status_str.c_str(), document->GetAllocator());
value->AddMember("status", status_val, document->GetAllocator());
value->AddMember(
"num_remaining_instances", num_remaining_instances_, document->GetAllocator());
}
void Coordinator::BackendState::InstanceStatsToJson(Value* value, Document* document) {
Value instance_stats(kArrayType);
{
lock_guard<mutex> l(lock_);
DCHECK(exec_done_) << "May only be called after WaitOnExecRpc() completes.";
for (const auto& elem : instance_stats_map_) {
Value val(kObjectType);
elem.second->ToJson(&val, document);
instance_stats.PushBack(val, document->GetAllocator());
}
}
value->AddMember("instance_stats", instance_stats, document->GetAllocator());
// impalad_address is not protected by lock_. The lifetime of the backend state is
// protected by Coordinator::lock_.
Value val(
NetworkAddressPBToString(impalad_address()).c_str(), document->GetAllocator());
value->AddMember("host", val, document->GetAllocator());
}
void Coordinator::BackendState::VLogForBackend(const string& msg) {
VLOG_QUERY << "query_id=" << PrintId(query_id_) << " target backend=" << krpc_host_
<< ": " << msg;
}
} // namespace impala