| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/coordinator.h" |
| |
| #include <limits> |
| #include <map> |
| #include <memory> |
| #include <thrift/protocol/TDebugProtocol.h> |
| #include <boost/algorithm/string/join.hpp> |
| #include <boost/accumulators/accumulators.hpp> |
| #include <boost/accumulators/statistics/stats.hpp> |
| #include <boost/accumulators/statistics/min.hpp> |
| #include <boost/accumulators/statistics/mean.hpp> |
| #include <boost/accumulators/statistics/median.hpp> |
| #include <boost/accumulators/statistics/max.hpp> |
| #include <boost/accumulators/statistics/variance.hpp> |
| #include <boost/bind.hpp> |
| #include <boost/filesystem.hpp> |
| #include <boost/lexical_cast.hpp> |
| #include <boost/unordered_set.hpp> |
| #include <boost/algorithm/string/split.hpp> |
| #include <boost/algorithm/string.hpp> |
| #include <gutil/strings/substitute.h> |
| #include <errno.h> |
| |
| #include "common/logging.h" |
| #include "exec/data-sink.h" |
| #include "exec/plan-root-sink.h" |
| #include "exec/scan-node.h" |
| #include "gen-cpp/Frontend_types.h" |
| #include "gen-cpp/ImpalaInternalService.h" |
| #include "gen-cpp/ImpalaInternalService_constants.h" |
| #include "gen-cpp/ImpalaInternalService_types.h" |
| #include "gen-cpp/Partitions_types.h" |
| #include "gen-cpp/PlanNodes_types.h" |
| #include "runtime/backend-client.h" |
| #include "runtime/client-cache.h" |
| #include "runtime/data-stream-mgr.h" |
| #include "runtime/data-stream-sender.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/hdfs-fs-cache.h" |
| #include "runtime/mem-tracker.h" |
| #include "runtime/parallel-executor.h" |
| #include "runtime/plan-fragment-executor.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/query-exec-mgr.h" |
| #include "runtime/query-state.h" |
| #include "runtime/fragment-instance-state.h" |
| #include "runtime/tuple-row.h" |
| #include "scheduling/scheduler.h" |
| #include "util/bloom-filter.h" |
| #include "util/container-util.h" |
| #include "util/counting-barrier.h" |
| #include "util/debug-util.h" |
| #include "util/error-util.h" |
| #include "util/hdfs-bulk-ops.h" |
| #include "util/hdfs-util.h" |
| #include "util/network-util.h" |
| #include "util/pretty-printer.h" |
| #include "util/summary-util.h" |
| #include "util/table-printer.h" |
| #include "util/uid-util.h" |
| |
| #include "common/names.h" |
| |
| using namespace apache::thrift; |
| using namespace strings; |
| namespace accumulators = boost::accumulators; |
| using boost::algorithm::iequals; |
| using boost::algorithm::is_any_of; |
| using boost::algorithm::join; |
| using boost::algorithm::token_compress_on; |
| using boost::algorithm::split; |
| using boost::filesystem::path; |
| using std::unique_ptr; |
| |
| DECLARE_int32(be_port); |
| DECLARE_string(hostname); |
| |
| DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by " |
| "INSERTs will inherit the permissions of their parent directories"); |
| |
| namespace impala { |
| |
| // Maximum number of fragment instances that can publish each broadcast filter. |
| static const int MAX_BROADCAST_FILTER_PRODUCERS = 3; |
| |
| // container for debug options in TPlanFragmentInstanceCtx (debug_node, debug_action, |
| // debug_phase) |
| struct DebugOptions { |
| int instance_state_idx; |
| int node_id; |
| TDebugAction::type action; |
| TExecNodePhase::type phase; // INVALID: debug options invalid |
| |
| DebugOptions() |
| : instance_state_idx(-1), node_id(-1), action(TDebugAction::WAIT), |
| phase(TExecNodePhase::INVALID) {} |
| |
| // If these debug options apply to the candidate fragment instance, returns true |
| // otherwise returns false. |
| bool IsApplicable(int candidate_instance_state_idx) { |
| if (phase == TExecNodePhase::INVALID) return false; |
| return (instance_state_idx == -1 || |
| instance_state_idx == candidate_instance_state_idx); |
| } |
| }; |
| |
| /// Execution state of a particular fragment instance. |
| /// |
| /// Concurrent accesses: |
| /// - updates through UpdateFragmentExecStatus() |
| class Coordinator::InstanceState { |
| public: |
| InstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool) |
| : exec_params_(params), |
| total_split_size_(0), |
| profile_(nullptr), |
| total_ranges_complete_(0), |
| rpc_latency_(0), |
| rpc_sent_(false), |
| done_(false), |
| profile_created_(false) { |
| const string& profile_name = Substitute("Instance $0 (host=$1)", |
| PrintId(params.instance_id), lexical_cast<string>(params.host)); |
| profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name)); |
| } |
| |
| /// Called to set the initial status of the fragment instance after the |
| /// ExecRemoteFragment() RPC has returned. If 'rpc_sent' is true, |
| /// CancelFragmentInstances() will include this instance in the set of potential |
| /// fragment instances to cancel. |
| void SetInitialStatus(const Status& status, bool rpc_sent) { |
| DCHECK(!rpc_sent_); |
| rpc_sent_ = rpc_sent; |
| status_ = status; |
| if (!status_.ok()) return; |
| stopwatch_.Start(); |
| } |
| |
| /// Computes sum of split sizes of leftmost scan. |
| void ComputeTotalSplitSize(const PerNodeScanRanges& per_node_scan_ranges); |
| |
| /// Updates the total number of scan ranges complete for this fragment. Returns the |
| /// delta since the last time this was called. Not thread-safe without lock() being |
| /// acquired by the caller. |
| int64_t UpdateNumScanRangesCompleted(); |
| |
| // The following getters do not require lock() to be held. |
| const TUniqueId& fragment_instance_id() const { return exec_params_.instance_id; } |
| FragmentIdx fragment_idx() const { return exec_params_.fragment().idx; } |
| MonotonicStopWatch* stopwatch() { return &stopwatch_; } |
| const TNetworkAddress& impalad_address() const { return exec_params_.host; } |
| int64_t total_split_size() const { return total_split_size_; } |
| bool done() const { return done_; } |
| int per_fragment_instance_idx() const { return exec_params_.per_fragment_instance_idx; } |
| bool rpc_sent() const { return rpc_sent_; } |
| int64_t rpc_latency() const { return rpc_latency_; } |
| |
| mutex* lock() { return &lock_; } |
| |
| void set_status(const Status& status) { status_ = status; } |
| void set_done(bool done) { done_ = done; } |
| void set_rpc_latency(int64_t millis) { |
| DCHECK_EQ(rpc_latency_, 0); |
| rpc_latency_ = millis; |
| } |
| |
| // Return values of the following functions must be accessed with lock() held |
| RuntimeProfile* profile() const { return profile_; } |
| void set_profile(RuntimeProfile* profile) { profile_ = profile; } |
| FragmentInstanceCounters* aggregate_counters() { return &aggregate_counters_; } |
| ErrorLogMap* error_log() { return &error_log_; } |
| Status* status() { return &status_; } |
| |
| /// Registers that the fragment instance's profile has been created and initially |
| /// populated. Returns whether the profile had already been initialised so that callers |
| /// can tell if they are the first to do so. Not thread-safe. |
| bool SetProfileCreated() { |
| bool cur = profile_created_; |
| profile_created_ = true; |
| return cur; |
| } |
| |
| private: |
| const FInstanceExecParams& exec_params_; |
| |
| /// Wall clock timer for this fragment. |
| MonotonicStopWatch stopwatch_; |
| |
| /// Summed across all splits; in bytes. |
| int64_t total_split_size_; |
| |
| /// Protects fields below. Can be held while doing an RPC, so SpinLock is a bad idea. |
| /// lock ordering: Coordinator::lock_ must only be obtained *prior* to lock_ |
| mutex lock_; |
| |
| /// If the status indicates an error status, execution of this fragment has either been |
| /// aborted by the executing impalad (which then reported the error) or cancellation has |
| /// been initiated; either way, execution must not be cancelled. |
| Status status_; |
| |
| /// Owned by coordinator object pool provided in the c'tor |
| RuntimeProfile* profile_; |
| |
| /// Errors reported by this fragment instance. |
| ErrorLogMap error_log_; |
| |
| /// Total scan ranges complete across all scan nodes. |
| int64_t total_ranges_complete_; |
| |
| /// Summary counters aggregated across the duration of execution. |
| FragmentInstanceCounters aggregate_counters_; |
| |
| /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC. |
| int64_t rpc_latency_; |
| |
| /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be |
| /// successful. |
| bool rpc_sent_; |
| |
| /// If true, execution terminated; do not cancel in that case. |
| bool done_; |
| |
| /// True after the first call to profile->Update() |
| bool profile_created_; |
| }; |
| |
| /// Represents a runtime filter target. |
| struct Coordinator::FilterTarget { |
| TPlanNodeId node_id; |
| bool is_local; |
| bool is_bound_by_partition_columns; |
| |
| // indices into fragment_instance_states_ |
| unordered_set<int> fragment_instance_state_idxs; |
| |
| FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) { |
| node_id = tFilterTarget.node_id; |
| is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns; |
| is_local = tFilterTarget.is_local_target; |
| } |
| }; |
| |
| |
| /// State of filters that are received for aggregation. |
| /// |
| /// A broadcast join filter is published as soon as the first update is received for it |
| /// and subsequent updates are ignored (as they will be the same). |
| /// Updates for a partitioned join filter are aggregated in 'bloom_filter' and this is |
| /// published once 'pending_count' reaches 0 and if the filter was not disabled before |
| /// that. |
| /// |
| /// A filter is disabled if an always_true filter update is received, an OOM is hit, |
| /// filter aggregation is complete or if the query is complete. |
| /// Once a filter is disabled, subsequent updates for that filter are ignored. |
| class Coordinator::FilterState { |
| public: |
| FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src) : desc_(desc), |
| src_(src), pending_count_(0), first_arrival_time_(0L), completion_time_(0L), |
| disabled_(false) { } |
| |
| TBloomFilter* bloom_filter() { return bloom_filter_.get(); } |
| boost::unordered_set<int>* src_fragment_instance_state_idxs() { |
| return &src_fragment_instance_state_idxs_; |
| } |
| const boost::unordered_set<int>& src_fragment_instance_state_idxs() const { |
| return src_fragment_instance_state_idxs_; |
| } |
| std::vector<FilterTarget>* targets() { return &targets_; } |
| const std::vector<FilterTarget>& targets() const { return targets_; } |
| int64_t first_arrival_time() const { return first_arrival_time_; } |
| int64_t completion_time() const { return completion_time_; } |
| const TPlanNodeId& src() const { return src_; } |
| const TRuntimeFilterDesc& desc() const { return desc_; } |
| int pending_count() const { return pending_count_; } |
| void set_pending_count(int pending_count) { pending_count_ = pending_count; } |
| bool disabled() const { return disabled_; } |
| |
| /// Aggregates partitioned join filters and updates memory consumption. |
| /// Disables filter if always_true filter is received or OOM is hit. |
| void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord); |
| |
| /// Disables a filter. A disabled filter consumes no memory. |
| void Disable(MemTracker* tracker); |
| |
| private: |
| /// Contains the specification of the runtime filter. |
| TRuntimeFilterDesc desc_; |
| |
| TPlanNodeId src_; |
| std::vector<FilterTarget> targets_; |
| |
| // Index into fragment_instance_states_ for source fragment instances. |
| boost::unordered_set<int> src_fragment_instance_state_idxs_; |
| |
| /// Number of remaining backends to hear from before filter is complete. |
| int pending_count_; |
| |
| /// BloomFilter aggregated from all source plan nodes, to be broadcast to all |
| /// destination plan fragment instances. Owned by this object so that it can be |
| /// deallocated once finished with. Only set for partitioned joins (broadcast joins |
| /// need no aggregation). |
| /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the |
| /// output structure in the case of a broadcast join. Similarly, for partitioned joins, |
| /// the filter is moved from the following member to the output structure. |
| std::unique_ptr<TBloomFilter> bloom_filter_; |
| |
| /// Time at which first local filter arrived. |
| int64_t first_arrival_time_; |
| |
| /// Time at which all local filters arrived. |
| int64_t completion_time_; |
| |
| /// True if the filter is permanently disabled for this query. |
| bool disabled_; |
| |
| /// TODO: Add a per-object lock so that we can avoid holding the global filter_lock_ |
| /// for every filter update. |
| |
| }; |
| |
| void Coordinator::InstanceState::ComputeTotalSplitSize( |
| const PerNodeScanRanges& per_node_scan_ranges) { |
| total_split_size_ = 0; |
| |
| for (const PerNodeScanRanges::value_type& entry: per_node_scan_ranges) { |
| for (const TScanRangeParams& scan_range_params: entry.second) { |
| if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue; |
| total_split_size_ += scan_range_params.scan_range.hdfs_file_split.length; |
| } |
| } |
| } |
| |
| int64_t Coordinator::InstanceState::UpdateNumScanRangesCompleted() { |
| int64_t total = 0; |
| CounterMap& complete = aggregate_counters_.scan_ranges_complete_counters; |
| for (CounterMap::iterator i = complete.begin(); i != complete.end(); ++i) { |
| total += i->second->value(); |
| } |
| int64_t delta = total - total_ranges_complete_; |
| total_ranges_complete_ = total; |
| DCHECK_GE(delta, 0); |
| return delta; |
| } |
| |
| Coordinator::Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env, |
| RuntimeProfile::EventSequence* events) |
| : schedule_(schedule), |
| exec_env_(exec_env), |
| has_called_wait_(false), |
| returned_all_results_(false), |
| query_mem_tracker_(), // Set in Exec() |
| num_remaining_fragment_instances_(0), |
| obj_pool_(new ObjectPool()), |
| query_events_(events), |
| filter_routing_table_complete_(false), |
| filter_mode_(schedule.query_options().runtime_filter_mode), |
| torn_down_(false) {} |
| |
| Coordinator::~Coordinator() { |
| DCHECK(torn_down_) << "TearDown() must be called before Coordinator is destroyed"; |
| |
| // This may be NULL while executing UDFs. |
| if (filter_mem_tracker_.get() != nullptr) { |
| filter_mem_tracker_->UnregisterFromParent(); |
| } |
| filter_mem_tracker_.reset(); |
| query_mem_tracker_.reset(); |
| } |
| |
| PlanFragmentExecutor* Coordinator::executor() { |
| return coord_instance_->executor(); |
| } |
| |
| TExecNodePhase::type GetExecNodePhase(const string& key) { |
| map<int, const char*>::const_iterator entry = |
| _TExecNodePhase_VALUES_TO_NAMES.begin(); |
| for (; entry != _TExecNodePhase_VALUES_TO_NAMES.end(); ++entry) { |
| if (iequals(key, (*entry).second)) { |
| return static_cast<TExecNodePhase::type>(entry->first); |
| } |
| } |
| return TExecNodePhase::INVALID; |
| } |
| |
| TDebugAction::type GetDebugAction(const string& key) { |
| map<int, const char*>::const_iterator entry = |
| _TDebugAction_VALUES_TO_NAMES.begin(); |
| for (; entry != _TDebugAction_VALUES_TO_NAMES.end(); ++entry) { |
| if (iequals(key, (*entry).second)) { |
| return static_cast<TDebugAction::type>(entry->first); |
| } |
| } |
| return TDebugAction::WAIT; |
| } |
| |
| static void ProcessQueryOptions( |
| const TQueryOptions& query_options, DebugOptions* debug_options) { |
| DCHECK(debug_options != NULL); |
| if (!query_options.__isset.debug_action || query_options.debug_action.empty()) { |
| debug_options->phase = TExecNodePhase::INVALID; // signal not set |
| return; |
| } |
| vector<string> components; |
| split(components, query_options.debug_action, is_any_of(":"), token_compress_on); |
| if (components.size() < 3 || components.size() > 4) return; |
| if (components.size() == 3) { |
| debug_options->instance_state_idx = -1; |
| debug_options->node_id = atoi(components[0].c_str()); |
| debug_options->phase = GetExecNodePhase(components[1]); |
| debug_options->action = GetDebugAction(components[2]); |
| } else { |
| debug_options->instance_state_idx = atoi(components[0].c_str()); |
| debug_options->node_id = atoi(components[1].c_str()); |
| debug_options->phase = GetExecNodePhase(components[2]); |
| debug_options->action = GetDebugAction(components[3]); |
| } |
| DCHECK(!(debug_options->phase == TExecNodePhase::CLOSE && |
| debug_options->action == TDebugAction::WAIT)) |
| << "Do not use CLOSE:WAIT debug actions " |
| << "because nodes cannot be cancelled in Close()"; |
| } |
| |
| Status Coordinator::Exec() { |
| const TQueryExecRequest& request = schedule_.request(); |
| DCHECK(request.plan_exec_info.size() > 0); |
| |
| needs_finalization_ = request.__isset.finalize_params; |
| if (needs_finalization_) finalize_params_ = request.finalize_params; |
| |
| VLOG_QUERY << "Exec() query_id=" << schedule_.query_id() |
| << " stmt=" << request.query_ctx.client_request.stmt; |
| stmt_type_ = request.stmt_type; |
| query_id_ = schedule_.query_id(); |
| desc_tbl_ = request.desc_tbl; |
| query_ctx_ = request.query_ctx; |
| |
| query_profile_.reset( |
| new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id_))); |
| finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer"); |
| filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT); |
| |
| SCOPED_TIMER(query_profile_->total_time_counter()); |
| |
| // initialize progress updater |
| const string& str = Substitute("Query $0", PrintId(query_id_)); |
| progress_.Init(str, schedule_.num_scan_ranges()); |
| |
| // runtime filters not yet supported for mt execution |
| bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0; |
| if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF; |
| |
| // to keep things simple, make async Cancel() calls wait until plan fragment |
| // execution has been initiated, otherwise we might try to cancel fragment |
| // execution at Impala daemons where it hasn't even started |
| lock_guard<mutex> l(lock_); |
| |
| // The coordinator may require a query mem tracker for result-caching, which tracks |
| // memory via the query mem tracker. |
| int64_t query_limit = -1; |
| if (query_ctx_.client_request.query_options.__isset.mem_limit |
| && query_ctx_.client_request.query_options.mem_limit > 0) { |
| query_limit = query_ctx_.client_request.query_options.mem_limit; |
| } |
| MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker( |
| schedule_.request_pool(), exec_env_->process_mem_tracker()); |
| query_mem_tracker_ = |
| MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker); |
| DCHECK(query_mem_tracker() != nullptr); |
| filter_mem_tracker_.reset( |
| new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false)); |
| |
| InitExecProfiles(); |
| InitExecSummary(); |
| StartFInstances(); |
| |
| // In the error case, it's safe to return and not to get coord_sink_ here to close - if |
| // there was an error, but the coordinator fragment was successfully started, it should |
| // cancel itself when it receives an error status after reporting its profile. |
| RETURN_IF_ERROR(FinishInstanceStartup()); |
| |
| // Grab executor and wait until Prepare() has finished so that runtime state etc. will |
| // be set up. Must do this here in order to get a reference to coord_instance_ |
| // so that coord_sink_ remains valid throughout query lifetime. |
| if (schedule_.GetCoordFragment() != nullptr) { |
| QueryState* qs = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id_); |
| if (qs != nullptr) coord_instance_ = qs->GetFInstanceState(query_id_); |
| if (coord_instance_ == nullptr) { |
| // Coordinator instance might have failed and unregistered itself even |
| // though it was successfully started (e.g. Prepare() might have failed). |
| if (qs != nullptr) { |
| ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs); |
| qs = nullptr; |
| } |
| InstanceState* coord_state = fragment_instance_states_[0]; |
| DCHECK(coord_state != nullptr); |
| lock_guard<mutex> instance_state_lock(*coord_state->lock()); |
| // Try and return the fragment instance status if it was already set. |
| // TODO: Consider waiting for coord_state->done() here. |
| RETURN_IF_ERROR(*coord_state->status()); |
| return Status( |
| Substitute("Coordinator fragment instance ($0) failed", PrintId(query_id_))); |
| } |
| |
| // When WaitForPrepare() returns OK(), the executor's root sink will be set up. At |
| // that point, the coordinator must be sure to call root_sink()->CloseConsumer(); the |
| // fragment instance's executor will not complete until that point. |
| // TODO: Consider moving this to Wait(). |
| Status prepare_status = executor()->WaitForPrepare(); |
| coord_sink_ = executor()->root_sink(); |
| RETURN_IF_ERROR(prepare_status); |
| DCHECK(coord_sink_ != nullptr); |
| } |
| |
| PrintFragmentInstanceInfo(); |
| return Status::OK(); |
| } |
| |
| void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& fragment_params) { |
| DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0); |
| int num_hosts = fragment_params.instance_exec_params.size(); |
| DCHECK_GT(num_hosts, 0); |
| DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) |
| << "UpdateFilterRoutingTable() called although runtime filters are disabled"; |
| DCHECK(!filter_routing_table_complete_) |
| << "UpdateFilterRoutingTable() called after setting filter_routing_table_complete_"; |
| |
| for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) { |
| if (!plan_node.__isset.runtime_filters) continue; |
| for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) { |
| if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) { |
| continue; |
| } |
| FilterRoutingTable::iterator i = filter_routing_table_.emplace( |
| filter.filter_id, FilterState(filter, plan_node.node_id)).first; |
| FilterState* f = &(i->second); |
| if (plan_node.__isset.hash_join_node) { |
| // Set the 'pending_count_' to zero to indicate that for a filter with local-only |
| // targets the coordinator does not expect to receive any filter updates. |
| int pending_count = filter.is_broadcast_join ? |
| (filter.has_remote_targets ? 1 : 0) : num_hosts; |
| f->set_pending_count(pending_count); |
| vector<int> src_idxs = fragment_params.GetInstanceIdxs(); |
| |
| // If this is a broadcast join with only non-local targets, build and publish it |
| // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join |
| // or it is a broadcast join with local targets, it should be generated |
| // everywhere the join is executed. |
| if (filter.is_broadcast_join && !filter.has_local_targets |
| && num_hosts > MAX_BROADCAST_FILTER_PRODUCERS) { |
| random_shuffle(src_idxs.begin(), src_idxs.end()); |
| src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS); |
| } |
| f->src_fragment_instance_state_idxs()->insert(src_idxs.begin(), src_idxs.end()); |
| } else if (plan_node.__isset.hdfs_scan_node) { |
| auto it = filter.planid_to_target_ndx.find(plan_node.node_id); |
| DCHECK(it != filter.planid_to_target_ndx.end()); |
| const TRuntimeFilterTargetDesc& tFilterTarget = filter.targets[it->second]; |
| if (filter_mode_ == TRuntimeFilterMode::LOCAL && !tFilterTarget.is_local_target) { |
| continue; |
| } |
| vector<int> idxs = fragment_params.GetInstanceIdxs(); |
| FilterTarget target(tFilterTarget); |
| target.fragment_instance_state_idxs.insert(idxs.begin(), idxs.end()); |
| f->targets()->push_back(target); |
| } else { |
| DCHECK(false) << "Unexpected plan node with runtime filters: " |
| << ThriftDebugString(plan_node); |
| } |
| } |
| } |
| } |
| |
| void Coordinator::StartFInstances() { |
| int num_fragment_instances = schedule_.GetNumFragmentInstances(); |
| DCHECK_GT(num_fragment_instances, 0); |
| |
| fragment_instance_states_.resize(num_fragment_instances); |
| exec_complete_barrier_.reset(new CountingBarrier(num_fragment_instances)); |
| num_remaining_fragment_instances_ = num_fragment_instances; |
| |
| DebugOptions debug_options; |
| ProcessQueryOptions(schedule_.query_options(), &debug_options); |
| const TQueryExecRequest& request = schedule_.request(); |
| |
| VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query " |
| << query_id_; |
| query_events_->MarkEvent( |
| Substitute("Ready to start $0 fragment instances", num_fragment_instances)); |
| |
| // TODO-MT: populate the runtime filter routing table |
| // This requires local aggregation of filters prior to sending |
| // for broadcast joins in order to avoid more complicated merge logic here. |
| |
| if (filter_mode_ != TRuntimeFilterMode::OFF) { |
| DCHECK_EQ(request.plan_exec_info.size(), 1); |
| // Populate the runtime filter routing table. This should happen before starting the |
| // fragment instances. This code anticipates the indices of the instance states |
| // created later on in ExecRemoteFragment() |
| for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) { |
| UpdateFilterRoutingTable(fragment_params); |
| } |
| MarkFilterRoutingTableComplete(); |
| } |
| |
| int num_instances = 0; |
| for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) { |
| num_instances += fragment_params.instance_exec_params.size(); |
| for (const FInstanceExecParams& instance_params: |
| fragment_params.instance_exec_params) { |
| InstanceState* exec_state = obj_pool()->Add( |
| new InstanceState(instance_params, obj_pool())); |
| int instance_state_idx = GetInstanceIdx(instance_params.instance_id); |
| fragment_instance_states_[instance_state_idx] = exec_state; |
| |
| DebugOptions* instance_debug_options = |
| debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL; |
| exec_env_->fragment_exec_thread_pool()->Offer( |
| std::bind(&Coordinator::ExecRemoteFInstance, |
| this, std::cref(instance_params), instance_debug_options)); |
| } |
| } |
| exec_complete_barrier_->Wait(); |
| VLOG_QUERY << "started " << num_fragment_instances << " fragment instances for query " |
| << query_id_; |
| query_events_->MarkEvent( |
| Substitute("All $0 fragment instances started", num_instances)); |
| } |
| |
| Status Coordinator::FinishInstanceStartup() { |
| Status status = Status::OK(); |
| const TMetricDef& def = |
| MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS); |
| HistogramMetric latencies(def, 20000, 3); |
| for (InstanceState* exec_state: fragment_instance_states_) { |
| lock_guard<mutex> l(*exec_state->lock()); |
| // Preserve the first non-OK status, if there is one |
| if (status.ok()) status = *exec_state->status(); |
| latencies.Update(exec_state->rpc_latency()); |
| } |
| |
| query_profile_->AddInfoString( |
| "Fragment instance start latencies", latencies.ToHumanReadable()); |
| |
| if (!status.ok()) { |
| DCHECK(query_status_.ok()); // nobody should have been able to cancel |
| query_status_ = status; |
| CancelInternal(); |
| } |
| return status; |
| } |
| |
| string Coordinator::FilterDebugString() { |
| TablePrinter table_printer; |
| table_printer.AddColumn("ID", false); |
| table_printer.AddColumn("Src. Node", false); |
| table_printer.AddColumn("Tgt. Node(s)", false); |
| table_printer.AddColumn("Targets", false); |
| table_printer.AddColumn("Target type", false); |
| table_printer.AddColumn("Partition filter", false); |
| |
| // Distribution metrics are only meaningful if the coordinator is routing the filter. |
| if (filter_mode_ == TRuntimeFilterMode::GLOBAL) { |
| table_printer.AddColumn("Pending (Expected)", false); |
| table_printer.AddColumn("First arrived", false); |
| table_printer.AddColumn("Completed", false); |
| } |
| table_printer.AddColumn("Enabled", false); |
| lock_guard<SpinLock> l(filter_lock_); |
| for (FilterRoutingTable::value_type& v: filter_routing_table_) { |
| vector<string> row; |
| const FilterState& state = v.second; |
| row.push_back(lexical_cast<string>(v.first)); |
| row.push_back(lexical_cast<string>(state.src())); |
| vector<string> target_ids; |
| vector<string> num_target_instances; |
| vector<string> target_types; |
| vector<string> partition_filter; |
| for (const FilterTarget& target: state.targets()) { |
| target_ids.push_back(lexical_cast<string>(target.node_id)); |
| num_target_instances.push_back( |
| lexical_cast<string>(target.fragment_instance_state_idxs.size())); |
| target_types.push_back(target.is_local ? "LOCAL" : "REMOTE"); |
| partition_filter.push_back(target.is_bound_by_partition_columns ? "true" : "false"); |
| } |
| row.push_back(join(target_ids, ", ")); |
| row.push_back(join(num_target_instances, ", ")); |
| row.push_back(join(target_types, ", ")); |
| row.push_back(join(partition_filter, ", ")); |
| |
| if (filter_mode_ == TRuntimeFilterMode::GLOBAL) { |
| int pending_count = state.completion_time() != 0L ? 0 : state.pending_count(); |
| row.push_back(Substitute("$0 ($1)", pending_count, |
| state.src_fragment_instance_state_idxs().size())); |
| if (state.first_arrival_time() == 0L) { |
| row.push_back("N/A"); |
| } else { |
| row.push_back(PrettyPrinter::Print(state.first_arrival_time(), TUnit::TIME_NS)); |
| } |
| if (state.completion_time() == 0L) { |
| row.push_back("N/A"); |
| } else { |
| row.push_back(PrettyPrinter::Print(state.completion_time(), TUnit::TIME_NS)); |
| } |
| } |
| |
| row.push_back(!state.disabled() ? "true" : "false"); |
| table_printer.AddRow(row); |
| } |
| // Add a line break, as in all contexts this is called we need to start a new line to |
| // print it correctly. |
| return Substitute("\n$0", table_printer.ToString()); |
| } |
| |
| void Coordinator::MarkFilterRoutingTableComplete() { |
| DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) |
| << "MarkFilterRoutingTableComplete() called although runtime filters are disabled"; |
| query_profile_->AddInfoString( |
| "Number of filters", Substitute("$0", filter_routing_table_.size())); |
| query_profile_->AddInfoString("Filter routing table", FilterDebugString()); |
| if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString(); |
| filter_routing_table_complete_ = true; |
| } |
| |
| Status Coordinator::GetStatus() { |
| lock_guard<mutex> l(lock_); |
| return query_status_; |
| } |
| |
| Status Coordinator::UpdateStatus(const Status& status, const TUniqueId& instance_id, |
| const string& instance_hostname) { |
| { |
| lock_guard<mutex> l(lock_); |
| |
| // The query is done and we are just waiting for fragment instances to clean up. |
| // Ignore their cancelled updates. |
| if (returned_all_results_ && status.IsCancelled()) return query_status_; |
| |
| // nothing to update |
| if (status.ok()) return query_status_; |
| |
| // don't override an error status; also, cancellation has already started |
| if (!query_status_.ok()) return query_status_; |
| |
| query_status_ = status; |
| CancelInternal(); |
| } |
| |
| // Log the id of the fragment that first failed so we can track it down easier. |
| VLOG_QUERY << "Query id=" << query_id_ << " failed because fragment id=" |
| << instance_id << " on host=" << instance_hostname << " failed."; |
| |
| return query_status_; |
| } |
| |
| void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& path_str, |
| PermissionCache* permissions_cache) { |
| // Find out if the path begins with a hdfs:// -style prefix, and remove it and the |
| // location (e.g. host:port) if so. |
| int scheme_end = path_str.find("://"); |
| string stripped_str; |
| if (scheme_end != string::npos) { |
| // Skip past the subsequent location:port/ prefix. |
| stripped_str = path_str.substr(path_str.find("/", scheme_end + 3)); |
| } else { |
| stripped_str = path_str; |
| } |
| |
| // Get the list of path components, used to build all path prefixes. |
| vector<string> components; |
| split(components, stripped_str, is_any_of("/")); |
| |
| // Build a set of all prefixes (including the complete string) of stripped_path. So |
| // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d |
| vector<string> prefixes; |
| // Stores the current prefix |
| stringstream accumulator; |
| for (const string& component: components) { |
| if (component.empty()) continue; |
| accumulator << "/" << component; |
| prefixes.push_back(accumulator.str()); |
| } |
| |
| // Now for each prefix, stat() it to see if a) it exists and b) if so what its |
| // permissions are. When we meet a directory that doesn't exist, we record the fact that |
| // we need to create it, and the permissions of its parent dir to inherit. |
| // |
| // Every prefix is recorded in the PermissionCache so we don't do more than one stat() |
| // for each path. If we need to create the directory, we record it as the pair (true, |
| // perms) so that the caller can identify which directories need their permissions |
| // explicitly set. |
| |
| // Set to the permission of the immediate parent (i.e. the permissions to inherit if the |
| // current dir doesn't exist). |
| short permissions = 0; |
| for (const string& path: prefixes) { |
| PermissionCache::const_iterator it = permissions_cache->find(path); |
| if (it == permissions_cache->end()) { |
| hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str()); |
| if (info != NULL) { |
| // File exists, so fill the cache with its current permissions. |
| permissions_cache->insert( |
| make_pair(path, make_pair(false, info->mPermissions))); |
| permissions = info->mPermissions; |
| hdfsFreeFileInfo(info, 1); |
| } else { |
| // File doesn't exist, so we need to set its permissions to its immediate parent |
| // once it's been created. |
| permissions_cache->insert(make_pair(path, make_pair(true, permissions))); |
| } |
| } else { |
| permissions = it->second.second; |
| } |
| } |
| } |
| |
| Status Coordinator::FinalizeSuccessfulInsert() { |
| PermissionCache permissions_cache; |
| HdfsFsCache::HdfsFsMap filesystem_connection_cache; |
| HdfsOperationSet partition_create_ops(&filesystem_connection_cache); |
| |
| // INSERT finalization happens in the five following steps |
| // 1. If OVERWRITE, remove all the files in the target directory |
| // 2. Create all the necessary partition directories. |
| DescriptorTbl* descriptor_table; |
| DescriptorTbl::Create(obj_pool(), desc_tbl_, &descriptor_table); |
| HdfsTableDescriptor* hdfs_table = static_cast<HdfsTableDescriptor*>( |
| descriptor_table->GetTableDescriptor(finalize_params_.table_id)); |
| DCHECK(hdfs_table != NULL) << "INSERT target table not known in descriptor table: " |
| << finalize_params_.table_id; |
| |
| // Loop over all partitions that were updated by this insert, and create the set of |
| // filesystem operations required to create the correct partition structure on disk. |
| for (const PartitionStatusMap::value_type& partition: per_partition_status_) { |
| SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer", |
| "FinalizationTimer")); |
| // INSERT allows writes to tables that have partitions on multiple filesystems. |
| // So we need to open connections to different filesystems as necessary. We use a |
| // local connection cache and populate it with one connection per filesystem that the |
| // partitions are on. |
| hdfsFS partition_fs_connection; |
| RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( |
| partition.second.partition_base_dir, &partition_fs_connection, |
| &filesystem_connection_cache)); |
| |
| // Look up the partition in the descriptor table. |
| stringstream part_path_ss; |
| if (partition.second.id == -1) { |
| // If this is a non-existant partition, use the default partition location of |
| // <base_dir>/part_key_1=val/part_key_2=val/... |
| part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first; |
| } else { |
| HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id); |
| DCHECK(part != NULL) << "table_id=" << hdfs_table->id() |
| << " partition_id=" << partition.second.id |
| << "\n" << PrintThrift(runtime_state()->instance_ctx()); |
| part_path_ss << part->location(); |
| } |
| const string& part_path = part_path_ss.str(); |
| bool is_s3_path = IsS3APath(part_path.c_str()); |
| |
| // If this is an overwrite insert, we will need to delete any updated partitions |
| if (finalize_params_.is_overwrite) { |
| if (partition.first.empty()) { |
| // If the root directory is written to, then the table must not be partitioned |
| DCHECK(per_partition_status_.size() == 1); |
| // We need to be a little more careful, and only delete data files in the root |
| // because the tmp directories the sink(s) wrote are there also. |
| // So only delete files in the table directory - all files are treated as data |
| // files by Hive and Impala, but directories are ignored (and may legitimately |
| // be used to store permanent non-table data by other applications). |
| int num_files = 0; |
| // hfdsListDirectory() only sets errno if there is an error, but it doesn't set |
| // it to 0 if the call succeed. When there is no error, errno could be any |
| // value. So need to clear errno before calling it. |
| // Once HDFS-8407 is fixed, the errno reset won't be needed. |
| errno = 0; |
| hdfsFileInfo* existing_files = |
| hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); |
| if (existing_files == NULL && errno == EAGAIN) { |
| errno = 0; |
| existing_files = |
| hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); |
| } |
| // hdfsListDirectory() returns NULL not only when there is an error but also |
| // when the directory is empty(HDFS-8407). Need to check errno to make sure |
| // the call fails. |
| if (existing_files == NULL && errno != 0) { |
| return GetHdfsErrorMsg("Could not list directory: ", part_path); |
| } |
| for (int i = 0; i < num_files; ++i) { |
| const string filename = path(existing_files[i].mName).filename().string(); |
| if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) { |
| partition_create_ops.Add(DELETE, existing_files[i].mName); |
| } |
| } |
| hdfsFreeFileInfo(existing_files, num_files); |
| } else { |
| // This is a partition directory, not the root directory; we can delete |
| // recursively with abandon, after checking that it ever existed. |
| // TODO: There's a potential race here between checking for the directory |
| // and a third-party deleting it. |
| if (FLAGS_insert_inherit_permissions && !is_s3_path) { |
| // There is no directory structure in S3, so "inheriting" permissions is not |
| // possible. |
| // TODO: Try to mimic inheriting permissions for S3. |
| PopulatePathPermissionCache( |
| partition_fs_connection, part_path, &permissions_cache); |
| } |
| // S3 doesn't have a directory structure, so we technically wouldn't need to |
| // CREATE_DIR on S3. However, libhdfs always checks if a path exists before |
| // carrying out an operation on that path. So we still need to call CREATE_DIR |
| // before we access that path due to this limitation. |
| if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) { |
| partition_create_ops.Add(DELETE_THEN_CREATE, part_path); |
| } else { |
| // Otherwise just create the directory. |
| partition_create_ops.Add(CREATE_DIR, part_path); |
| } |
| } |
| } else if (!is_s3_path |
| || !query_ctx_.client_request.query_options.s3_skip_insert_staging) { |
| // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories |
| // would have already been created by the table sinks. |
| if (FLAGS_insert_inherit_permissions && !is_s3_path) { |
| PopulatePathPermissionCache( |
| partition_fs_connection, part_path, &permissions_cache); |
| } |
| if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) { |
| partition_create_ops.Add(CREATE_DIR, part_path); |
| } |
| } |
| } |
| |
| { |
| SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer", |
| "FinalizationTimer")); |
| if (!partition_create_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) { |
| for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) { |
| // It's ok to ignore errors creating the directories, since they may already |
| // exist. If there are permission errors, we'll run into them later. |
| if (err.first->op() != CREATE_DIR) { |
| return Status(Substitute( |
| "Error(s) deleting partition directories. First error (of $0) was: $1", |
| partition_create_ops.errors().size(), err.second)); |
| } |
| } |
| } |
| } |
| |
| // 3. Move all tmp files |
| HdfsOperationSet move_ops(&filesystem_connection_cache); |
| HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache); |
| |
| for (FileMoveMap::value_type& move: files_to_move_) { |
| // Empty destination means delete, so this is a directory. These get deleted in a |
| // separate pass to ensure that we have moved all the contents of the directory first. |
| if (move.second.empty()) { |
| VLOG_ROW << "Deleting file: " << move.first; |
| dir_deletion_ops.Add(DELETE, move.first); |
| } else { |
| VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second; |
| if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) { |
| move_ops.Add(RENAME, move.first, move.second); |
| } else { |
| move_ops.Add(MOVE, move.first, move.second); |
| } |
| } |
| } |
| |
| { |
| SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", "FinalizationTimer")); |
| if (!move_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) { |
| stringstream ss; |
| ss << "Error(s) moving partition files. First error (of " |
| << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second; |
| return Status(ss.str()); |
| } |
| } |
| |
| // 4. Delete temp directories |
| { |
| SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer", |
| "FinalizationTimer")); |
| if (!dir_deletion_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) { |
| stringstream ss; |
| ss << "Error(s) deleting staging directories. First error (of " |
| << dir_deletion_ops.errors().size() << ") was: " |
| << dir_deletion_ops.errors()[0].second; |
| return Status(ss.str()); |
| } |
| } |
| |
| // 5. Optionally update the permissions of the created partition directories |
| // Do this last so that we don't make a dir unwritable before we write to it. |
| if (FLAGS_insert_inherit_permissions) { |
| HdfsOperationSet chmod_ops(&filesystem_connection_cache); |
| for (const PermissionCache::value_type& perm: permissions_cache) { |
| bool new_dir = perm.second.first; |
| if (new_dir) { |
| short permissions = perm.second.second; |
| VLOG_QUERY << "INSERT created new directory: " << perm.first |
| << ", inherited permissions are: " << oct << permissions; |
| chmod_ops.Add(CHMOD, perm.first, permissions); |
| } |
| } |
| if (!chmod_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) { |
| stringstream ss; |
| ss << "Error(s) setting permissions on newly created partition directories. First" |
| << " error (of " << chmod_ops.errors().size() << ") was: " |
| << chmod_ops.errors()[0].second; |
| return Status(ss.str()); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status Coordinator::FinalizeQuery() { |
| // All instances must have reported their final statuses before finalization, which is a |
| // post-condition of Wait. If the query was not successful, still try to clean up the |
| // staging directory. |
| DCHECK(has_called_wait_); |
| DCHECK(needs_finalization_); |
| |
| VLOG_QUERY << "Finalizing query: " << query_id_; |
| SCOPED_TIMER(finalization_timer_); |
| Status return_status = GetStatus(); |
| if (return_status.ok()) { |
| return_status = FinalizeSuccessfulInsert(); |
| } |
| |
| stringstream staging_dir; |
| DCHECK(finalize_params_.__isset.staging_dir); |
| staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id_,"_") << "/"; |
| |
| hdfsFS hdfs_conn; |
| RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), &hdfs_conn)); |
| VLOG_QUERY << "Removing staging directory: " << staging_dir.str(); |
| hdfsDelete(hdfs_conn, staging_dir.str().c_str(), 1); |
| |
| return return_status; |
| } |
| |
| Status Coordinator::WaitForAllInstances() { |
| unique_lock<mutex> l(lock_); |
| while (num_remaining_fragment_instances_ > 0 && query_status_.ok()) { |
| VLOG_QUERY << "Coordinator waiting for fragment instances to finish, " |
| << num_remaining_fragment_instances_ << " remaining"; |
| instance_completion_cv_.wait(l); |
| } |
| if (query_status_.ok()) { |
| VLOG_QUERY << "All fragment instances finished successfully."; |
| } else { |
| VLOG_QUERY << "All fragment instances finished due to one or more errors. " |
| << query_status_.GetDetail(); |
| } |
| |
| return query_status_; |
| } |
| |
| Status Coordinator::Wait() { |
| lock_guard<mutex> l(wait_lock_); |
| SCOPED_TIMER(query_profile_->total_time_counter()); |
| if (has_called_wait_) return Status::OK(); |
| has_called_wait_ = true; |
| |
| if (stmt_type_ == TStmtType::QUERY) { |
| DCHECK(executor() != nullptr); |
| return UpdateStatus(executor()->WaitForOpen(), runtime_state()->fragment_instance_id(), |
| FLAGS_hostname); |
| } |
| |
| DCHECK_EQ(stmt_type_, TStmtType::DML); |
| // Query finalization can only happen when all backends have reported |
| // relevant state. They only have relevant state to report in the parallel |
| // INSERT case, otherwise all the relevant state is from the coordinator |
| // fragment which will be available after Open() returns. |
| // Ignore the returned status if finalization is required., since FinalizeQuery() will |
| // pick it up and needs to execute regardless. |
| Status status = WaitForAllInstances(); |
| if (!needs_finalization_ && !status.ok()) return status; |
| |
| // Query finalization is required only for HDFS table sinks |
| if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery()); |
| |
| query_profile_->AddInfoString( |
| "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n")); |
| // For DML queries, when Wait is done, the query is complete. Report aggregate |
| // query profiles at this point. |
| // TODO: make sure ReportQuerySummary gets called on error |
| ReportQuerySummary(); |
| |
| return status; |
| } |
| |
| Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { |
| VLOG_ROW << "GetNext() query_id=" << query_id_; |
| DCHECK(has_called_wait_); |
| SCOPED_TIMER(query_profile_->total_time_counter()); |
| |
| if (returned_all_results_) { |
| // May be called after the first time we set *eos. Re-set *eos and return here; |
| // already torn-down coord_sink_ so no more work to do. |
| *eos = true; |
| return Status::OK(); |
| } |
| |
| DCHECK(coord_sink_ != nullptr) |
| << "GetNext() called without result sink. Perhaps Prepare() failed and was not " |
| << "checked?"; |
| Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos); |
| |
| // if there was an error, we need to return the query's error status rather than |
| // the status we just got back from the local executor (which may well be CANCELLED |
| // in that case). Coordinator fragment failed in this case so we log the query_id. |
| RETURN_IF_ERROR( |
| UpdateStatus(status, runtime_state()->fragment_instance_id(), FLAGS_hostname)); |
| |
| if (*eos) { |
| returned_all_results_ = true; |
| // Trigger tear-down of coordinator fragment by closing the consumer. Must do before |
| // WaitForAllInstances(). |
| coord_sink_->CloseConsumer(); |
| coord_sink_ = nullptr; |
| |
| // Don't return final NULL until all instances have completed. GetNext must wait for |
| // all instances to complete before ultimately signalling the end of execution via a |
| // NULL batch. After NULL is returned, the coordinator may tear down query state, and |
| // perform post-query finalization which might depend on the reports from all |
| // instances. |
| // |
| // TODO: Waiting should happen in TearDown() (and then we wouldn't need to call |
| // CloseConsumer() here). See IMPALA-4275 for details. |
| RETURN_IF_ERROR(WaitForAllInstances()); |
| if (query_status_.ok()) { |
| // If the query completed successfully, report aggregate query profiles. |
| ReportQuerySummary(); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| void Coordinator::PrintFragmentInstanceInfo() { |
| for (InstanceState* state: fragment_instance_states_) { |
| SummaryStats& acc = fragment_profiles_[state->fragment_idx()].bytes_assigned; |
| acc(state->total_split_size()); |
| } |
| |
| for (int id = (executor() == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) { |
| SummaryStats& acc = fragment_profiles_[id].bytes_assigned; |
| double min = accumulators::min(acc); |
| double max = accumulators::max(acc); |
| double mean = accumulators::mean(acc); |
| double stddev = sqrt(accumulators::variance(acc)); |
| stringstream ss; |
| ss << " min: " << PrettyPrinter::Print(min, TUnit::BYTES) |
| << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES) |
| << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES) |
| << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES); |
| fragment_profiles_[id].averaged_profile->AddInfoString("split sizes", ss.str()); |
| |
| if (VLOG_FILE_IS_ON) { |
| VLOG_FILE << "Byte split for fragment " << id << " " << ss.str(); |
| for (InstanceState* exec_state: fragment_instance_states_) { |
| if (exec_state->fragment_idx() != id) continue; |
| VLOG_FILE << "data volume for ipaddress " << exec_state << ": " |
| << PrettyPrinter::Print(exec_state->total_split_size(), TUnit::BYTES); |
| } |
| } |
| } |
| } |
| |
| void Coordinator::InitExecSummary() { |
| const TQueryExecRequest& request = schedule_.request(); |
| // init exec_summary_.{nodes, exch_to_sender_map} |
| exec_summary_.__isset.nodes = true; |
| DCHECK(exec_summary_.nodes.empty()); |
| for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) { |
| for (const TPlanFragment& fragment: plan_exec_info.fragments) { |
| if (!fragment.__isset.plan) continue; |
| |
| // eventual index of fragment's root node in exec_summary_.nodes |
| int root_node_idx = exec_summary_.nodes.size(); |
| |
| const TPlan& plan = fragment.plan; |
| int num_instances = |
| schedule_.GetFragmentExecParams(fragment.idx).instance_exec_params.size(); |
| for (const TPlanNode& node: plan.nodes) { |
| plan_node_id_to_summary_map_[node.node_id] = exec_summary_.nodes.size(); |
| exec_summary_.nodes.emplace_back(); |
| TPlanNodeExecSummary& node_summary = exec_summary_.nodes.back(); |
| node_summary.__set_node_id(node.node_id); |
| node_summary.__set_fragment_idx(fragment.idx); |
| node_summary.__set_label(node.label); |
| node_summary.__set_label_detail(node.label_detail); |
| node_summary.__set_num_children(node.num_children); |
| if (node.__isset.estimated_stats) { |
| node_summary.__set_estimated_stats(node.estimated_stats); |
| } |
| node_summary.exec_stats.resize(num_instances); |
| } |
| |
| if (fragment.__isset.output_sink |
| && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK) { |
| const TDataStreamSink& sink = fragment.output_sink.stream_sink; |
| int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id]; |
| if (sink.output_partition.type == TPartitionType::UNPARTITIONED) { |
| exec_summary_.nodes[exch_idx].__set_is_broadcast(true); |
| } |
| exec_summary_.__isset.exch_to_sender_map = true; |
| exec_summary_.exch_to_sender_map[exch_idx] = root_node_idx; |
| } |
| } |
| } |
| } |
| |
| void Coordinator::InitExecProfiles() { |
| vector<const TPlanFragment*> fragments; |
| schedule_.GetTPlanFragments(&fragments); |
| fragment_profiles_.resize(fragments.size()); |
| |
| const TPlanFragment* coord_fragment = schedule_.GetCoordFragment(); |
| |
| // Initialize the runtime profile structure. This adds the per fragment average |
| // profiles followed by the per fragment instance profiles. |
| for (const TPlanFragment* fragment: fragments) { |
| string profile_name = |
| (fragment == coord_fragment) ? "Coordinator Fragment $0" : "Fragment $0"; |
| PerFragmentProfileData* data = &fragment_profiles_[fragment->idx]; |
| data->num_instances = |
| schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size(); |
| // TODO-MT: stop special-casing the coordinator fragment |
| if (fragment != coord_fragment) { |
| data->averaged_profile = obj_pool()->Add(new RuntimeProfile( |
| obj_pool(), Substitute("Averaged Fragment $0", fragment->display_name), true)); |
| query_profile_->AddChild(data->averaged_profile, true); |
| } |
| data->root_profile = obj_pool()->Add( |
| new RuntimeProfile(obj_pool(), Substitute(profile_name, fragment->display_name))); |
| // Note: we don't start the wall timer here for the fragment profile; |
| // it's uninteresting and misleading. |
| query_profile_->AddChild(data->root_profile); |
| } |
| } |
| |
| void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile, |
| FragmentInstanceCounters* counters) { |
| vector<RuntimeProfile*> children; |
| profile->GetAllChildren(&children); |
| for (RuntimeProfile* p: children) { |
| PlanNodeId id = ExecNode::GetNodeIdFromProfile(p); |
| |
| // This profile is not for an exec node. |
| if (id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) continue; |
| |
| RuntimeProfile::Counter* throughput_counter = |
| p->GetCounter(ScanNode::TOTAL_THROUGHPUT_COUNTER); |
| if (throughput_counter != NULL) { |
| counters->throughput_counters[id] = throughput_counter; |
| } |
| RuntimeProfile::Counter* scan_ranges_counter = |
| p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER); |
| if (scan_ranges_counter != NULL) { |
| counters->scan_ranges_complete_counters[id] = scan_ranges_counter; |
| } |
| } |
| } |
| |
| void Coordinator::ExecRemoteFInstance( |
| const FInstanceExecParams& exec_params, const DebugOptions* debug_options) { |
| NotifyBarrierOnExit notifier(exec_complete_barrier_.get()); |
| TExecPlanFragmentParams rpc_params; |
| SetExecPlanFragmentParams(exec_params, &rpc_params); |
| if (debug_options != NULL) { |
| rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id); |
| rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action); |
| rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase); |
| } |
| int instance_state_idx = GetInstanceIdx(exec_params.instance_id); |
| InstanceState* exec_state = fragment_instance_states_[instance_state_idx]; |
| exec_state->ComputeTotalSplitSize( |
| rpc_params.fragment_instance_ctx.per_node_scan_ranges); |
| VLOG_FILE << "making rpc: ExecPlanFragment" |
| << " host=" << exec_state->impalad_address() |
| << " instance_id=" << PrintId(exec_state->fragment_instance_id()); |
| |
| // Guard against concurrent UpdateExecStatus() that may arrive after RPC returns. |
| lock_guard<mutex> l(*exec_state->lock()); |
| int64_t start = MonotonicMillis(); |
| |
| Status client_connect_status; |
| ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(), |
| exec_state->impalad_address(), &client_connect_status); |
| if (!client_connect_status.ok()) { |
| exec_state->SetInitialStatus(client_connect_status, false); |
| return; |
| } |
| |
| TExecPlanFragmentResult thrift_result; |
| Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment, |
| rpc_params, &thrift_result); |
| exec_state->set_rpc_latency(MonotonicMillis() - start); |
| |
| const string ERR_TEMPLATE = "ExecPlanRequest rpc query_id=$0 instance_id=$1 failed: $2"; |
| |
| if (!rpc_status.ok()) { |
| const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), |
| PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg()); |
| VLOG_QUERY << err_msg; |
| exec_state->SetInitialStatus(Status(err_msg), true); |
| return; |
| } |
| |
| Status exec_status = Status(thrift_result.status); |
| if (!exec_status.ok()) { |
| const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), |
| PrintId(exec_state->fragment_instance_id()), |
| exec_status.msg().GetFullMessageDetails()); |
| VLOG_QUERY << err_msg; |
| exec_state->SetInitialStatus(Status(err_msg), true); |
| return; |
| } |
| |
| exec_state->SetInitialStatus(Status::OK(), true); |
| VLOG_FILE << "rpc succeeded: ExecPlanFragment" |
| << " instance_id=" << PrintId(exec_state->fragment_instance_id()); |
| } |
| |
| void Coordinator::Cancel(const Status* cause) { |
| lock_guard<mutex> l(lock_); |
| // if the query status indicates an error, cancellation has already been initiated |
| if (!query_status_.ok()) return; |
| // prevent others from cancelling a second time |
| |
| // TODO: This should default to OK(), not CANCELLED if there is no cause (or callers |
| // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end |
| // of a successful query. Need to clean up relationship between query_status_ here and |
| // in QueryExecState. See IMPALA-4279. |
| query_status_ = (cause != NULL && !cause->ok()) ? *cause : Status::CANCELLED; |
| CancelInternal(); |
| } |
| |
| void Coordinator::CancelInternal() { |
| VLOG_QUERY << "Cancel() query_id=" << query_id_; |
| CancelFragmentInstances(); |
| |
| // Report the summary with whatever progress the query made before being cancelled. |
| ReportQuerySummary(); |
| } |
| |
| void Coordinator::CancelFragmentInstances() { |
| int num_cancelled = 0; |
| for (InstanceState* exec_state: fragment_instance_states_) { |
| DCHECK(exec_state != nullptr); |
| |
| // lock each exec_state individually to synchronize correctly with |
| // UpdateFragmentExecStatus() (which doesn't get the global lock_ |
| // to set its status) |
| lock_guard<mutex> l(*exec_state->lock()); |
| |
| // Nothing to cancel if the exec rpc was not sent |
| if (!exec_state->rpc_sent()) continue; |
| |
| // don't cancel if it already finished |
| if (exec_state->done()) continue; |
| |
| /// If the status is not OK, we still try to cancel - !OK status might mean |
| /// communication failure between fragment instance and coordinator, but fragment |
| /// instance might still be running. |
| |
| // set an error status to make sure we only cancel this once |
| exec_state->set_status(Status::CANCELLED); |
| |
| // if we get an error while trying to get a connection to the backend, |
| // keep going |
| Status status; |
| ImpalaBackendConnection backend_client( |
| exec_env_->impalad_client_cache(), exec_state->impalad_address(), &status); |
| if (!status.ok()) continue; |
| ++num_cancelled; |
| TCancelPlanFragmentParams params; |
| params.protocol_version = ImpalaInternalServiceVersion::V1; |
| params.__set_fragment_instance_id(exec_state->fragment_instance_id()); |
| TCancelPlanFragmentResult res; |
| VLOG_QUERY << "sending CancelPlanFragment rpc for instance_id=" |
| << exec_state->fragment_instance_id() << " backend=" |
| << exec_state->impalad_address(); |
| Status rpc_status; |
| // Try to send the RPC 3 times before failing. |
| bool retry_is_safe; |
| for (int i = 0; i < 3; ++i) { |
| rpc_status = backend_client.DoRpc(&ImpalaBackendClient::CancelPlanFragment, |
| params, &res, &retry_is_safe); |
| if (rpc_status.ok() || !retry_is_safe) break; |
| } |
| if (!rpc_status.ok()) { |
| exec_state->status()->MergeStatus(rpc_status); |
| stringstream msg; |
| msg << "CancelPlanFragment rpc query_id=" << query_id_ |
| << " instance_id=" << exec_state->fragment_instance_id() |
| << " failed: " << rpc_status.msg().msg(); |
| // make a note of the error status, but keep on cancelling the other fragments |
| exec_state->status()->AddDetail(msg.str()); |
| continue; |
| } |
| if (res.status.status_code != TErrorCode::OK) { |
| exec_state->status()->AddDetail(join(res.status.error_msgs, "; ")); |
| } |
| } |
| VLOG_QUERY << Substitute( |
| "CancelFragmentInstances() query_id=$0, tried to cancel $1 fragment instances", |
| PrintId(query_id_), num_cancelled); |
| |
| // Notify that we completed with an error. |
| instance_completion_cv_.notify_all(); |
| } |
| |
| Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& params) { |
| VLOG_FILE << "UpdateFragmentExecStatus() " |
| << " instance=" << PrintId(params.fragment_instance_id) |
| << " status=" << params.status.status_code |
| << " done=" << (params.done ? "true" : "false"); |
| int instance_state_idx = GetInstanceIdx(params.fragment_instance_id); |
| if (instance_state_idx >= fragment_instance_states_.size()) { |
| return Status(TErrorCode::INTERNAL_ERROR, |
| Substitute("Unknown fragment instance index $0 (max known: $1)", |
| instance_state_idx, fragment_instance_states_.size() - 1)); |
| } |
| InstanceState* exec_state = fragment_instance_states_[instance_state_idx]; |
| |
| const TRuntimeProfileTree& cumulative_profile = params.profile; |
| Status status(params.status); |
| { |
| lock_guard<mutex> l(*exec_state->lock()); |
| if (!status.ok()) { |
| // During query cancellation, exec_state is set to CANCELLED. However, we might |
| // process a non-error message from a fragment executor that is sent |
| // before query cancellation is invoked. Make sure we don't go from error status to |
| // OK. |
| exec_state->set_status(status); |
| } |
| exec_state->set_done(params.done); |
| if (exec_state->status()->ok()) { |
| // We can't update this backend's profile if ReportQuerySummary() is running, |
| // because it depends on all profiles not changing during its execution (when it |
| // calls SortChildren()). ReportQuerySummary() only gets called after |
| // WaitForAllInstances() returns or at the end of CancelFragmentInstances(). |
| // WaitForAllInstances() only returns after all backends have completed (in which |
| // case we wouldn't be in this function), or when there's an error, in which case |
| // CancelFragmentInstances() is called. CancelFragmentInstances sets all |
| // exec_state's statuses to cancelled. |
| // TODO: We're losing this profile information. Call ReportQuerySummary only after |
| // all backends have completed. |
| exec_state->profile()->Update(cumulative_profile); |
| |
| // Update the average profile for the fragment corresponding to this instance. |
| exec_state->profile()->ComputeTimeInProfile(); |
| UpdateAverageProfile(exec_state); |
| UpdateExecSummary(*exec_state); |
| } |
| if (!exec_state->SetProfileCreated()) { |
| CollectScanNodeCounters(exec_state->profile(), exec_state->aggregate_counters()); |
| } |
| |
| // Log messages aggregated by type |
| if (params.__isset.error_log && params.error_log.size() > 0) { |
| // Append the log messages from each update with the global state of the query |
| // execution |
| MergeErrorMaps(exec_state->error_log(), params.error_log); |
| VLOG_FILE << "instance_id=" << exec_state->fragment_instance_id() |
| << " error log: " << PrintErrorMapToString(*exec_state->error_log()); |
| } |
| progress_.Update(exec_state->UpdateNumScanRangesCompleted()); |
| } |
| |
| if (params.done && params.__isset.insert_exec_status) { |
| lock_guard<mutex> l(lock_); |
| // Merge in table update data (partitions written to, files to be moved as part of |
| // finalization) |
| for (const PartitionStatusMap::value_type& partition: |
| params.insert_exec_status.per_partition_status) { |
| TInsertPartitionStatus* status = &(per_partition_status_[partition.first]); |
| status->__set_num_modified_rows( |
| status->num_modified_rows + partition.second.num_modified_rows); |
| status->__set_kudu_latest_observed_ts(std::max( |
| partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts)); |
| status->__set_id(partition.second.id); |
| status->__set_partition_base_dir(partition.second.partition_base_dir); |
| |
| if (partition.second.__isset.stats) { |
| if (!status->__isset.stats) status->__set_stats(TInsertStats()); |
| DataSink::MergeDmlStats(partition.second.stats, &status->stats); |
| } |
| } |
| files_to_move_.insert( |
| params.insert_exec_status.files_to_move.begin(), |
| params.insert_exec_status.files_to_move.end()); |
| } |
| |
| if (VLOG_FILE_IS_ON) { |
| stringstream s; |
| exec_state->profile()->PrettyPrint(&s); |
| VLOG_FILE << "profile for instance_id=" << exec_state->fragment_instance_id() |
| << "\n" << s.str(); |
| } |
| // also print the cumulative profile |
| // TODO: fix the coordinator/PlanFragmentExecutor, so this isn't needed |
| if (VLOG_FILE_IS_ON) { |
| stringstream s; |
| query_profile_->PrettyPrint(&s); |
| VLOG_FILE << "cumulative profile for query_id=" << query_id_ |
| << "\n" << s.str(); |
| } |
| |
| // for now, abort the query if we see any error except if the error is cancelled |
| // and returned_all_results_ is true. |
| // (UpdateStatus() initiates cancellation, if it hasn't already been) |
| if (!(returned_all_results_ && status.IsCancelled()) && !status.ok()) { |
| UpdateStatus(status, exec_state->fragment_instance_id(), |
| TNetworkAddressToString(exec_state->impalad_address())); |
| return Status::OK(); |
| } |
| |
| if (params.done) { |
| lock_guard<mutex> l(lock_); |
| exec_state->stopwatch()->Stop(); |
| DCHECK_GT(num_remaining_fragment_instances_, 0); |
| VLOG_QUERY << "Fragment instance completed:" |
| << " id=" << PrintId(exec_state->fragment_instance_id()) |
| << " host=" << exec_state->impalad_address() |
| << " remaining=" << num_remaining_fragment_instances_ - 1; |
| if (VLOG_QUERY_IS_ON && num_remaining_fragment_instances_ > 1) { |
| // print host/port info for the first backend that's still in progress as a |
| // debugging aid for backend deadlocks |
| for (InstanceState* exec_state: fragment_instance_states_) { |
| lock_guard<mutex> l2(*exec_state->lock()); |
| if (!exec_state->done()) { |
| VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress backend: " |
| << exec_state->impalad_address(); |
| break; |
| } |
| } |
| } |
| if (--num_remaining_fragment_instances_ == 0) { |
| instance_completion_cv_.notify_all(); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| uint64_t Coordinator::GetLatestKuduInsertTimestamp() const { |
| uint64_t max_ts = 0; |
| for (const auto& entry : per_partition_status_) { |
| max_ts = std::max(max_ts, |
| static_cast<uint64_t>(entry.second.kudu_latest_observed_ts)); |
| } |
| return max_ts; |
| } |
| |
| RuntimeState* Coordinator::runtime_state() { |
| return executor() == NULL ? NULL : executor()->runtime_state(); |
| } |
| |
| MemTracker* Coordinator::query_mem_tracker() { |
| return query_mem_tracker_.get(); |
| } |
| |
| bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) { |
| // Assume we are called only after all fragments have completed |
| DCHECK(has_called_wait_); |
| |
| for (const PartitionStatusMap::value_type& partition: per_partition_status_) { |
| catalog_update->created_partitions.insert(partition.first); |
| } |
| |
| return catalog_update->created_partitions.size() != 0; |
| } |
| |
| // Comparator to order RuntimeProfiles by descending total time |
| typedef struct { |
| typedef pair<RuntimeProfile*, bool> Profile; |
| bool operator()(const Profile& a, const Profile& b) const { |
| // Reverse ordering: we want the longest first |
| return |
| a.first->total_time_counter()->value() > b.first->total_time_counter()->value(); |
| } |
| } InstanceComparator; |
| |
| void Coordinator::UpdateAverageProfile(InstanceState* instance_state) { |
| FragmentIdx fragment_idx = instance_state->fragment_idx(); |
| DCHECK_GE(fragment_idx, 0); |
| DCHECK_LT(fragment_idx, fragment_profiles_.size()); |
| PerFragmentProfileData* data = &fragment_profiles_[fragment_idx]; |
| |
| // No locks are taken since UpdateAverage() and AddChild() take their own locks |
| if (data->averaged_profile != nullptr) { |
| data->averaged_profile->UpdateAverage(instance_state->profile()); |
| } |
| data->root_profile->AddChild(instance_state->profile()); |
| } |
| |
| void Coordinator::ComputeFragmentSummaryStats(InstanceState* instance_state) { |
| FragmentIdx fragment_idx = instance_state->fragment_idx(); |
| DCHECK_GE(fragment_idx, 0); |
| DCHECK_LT(fragment_idx, fragment_profiles_.size()); |
| PerFragmentProfileData* data = &fragment_profiles_[fragment_idx]; |
| |
| int64_t completion_time = instance_state->stopwatch()->ElapsedTime(); |
| data->completion_times(completion_time); |
| data->rates(instance_state->total_split_size() |
| / (completion_time / 1000.0 / 1000.0 / 1000.0)); |
| |
| // Add the child in case it has not been added previously |
| // via UpdateAverageProfile(). AddChild() will do nothing if the child |
| // already exists. |
| data->root_profile->AddChild(instance_state->profile()); |
| } |
| |
| void Coordinator::UpdateExecSummary(const InstanceState& instance_state) { |
| vector<RuntimeProfile*> children; |
| instance_state.profile()->GetAllChildren(&children); |
| |
| lock_guard<SpinLock> l(exec_summary_lock_); |
| for (int i = 0; i < children.size(); ++i) { |
| int node_id = ExecNode::GetNodeIdFromProfile(children[i]); |
| if (node_id == -1) continue; |
| |
| TPlanNodeExecSummary& exec_summary = |
| exec_summary_.nodes[plan_node_id_to_summary_map_[node_id]]; |
| DCHECK_LT(instance_state.per_fragment_instance_idx(), exec_summary.exec_stats.size()); |
| DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances, |
| exec_summary.exec_stats.size()); |
| TExecStats& stats = |
| exec_summary.exec_stats[instance_state.per_fragment_instance_idx()]; |
| |
| RuntimeProfile::Counter* rows_counter = children[i]->GetCounter("RowsReturned"); |
| RuntimeProfile::Counter* mem_counter = children[i]->GetCounter("PeakMemoryUsage"); |
| if (rows_counter != NULL) stats.__set_cardinality(rows_counter->value()); |
| if (mem_counter != NULL) stats.__set_memory_used(mem_counter->value()); |
| stats.__set_latency_ns(children[i]->local_time()); |
| // TODO: we don't track cpu time per node now. Do that. |
| exec_summary.__isset.exec_stats = true; |
| } |
| VLOG(2) << PrintExecSummary(exec_summary_); |
| } |
| |
| // This function appends summary information to the query_profile_ before |
| // outputting it to VLOG. It adds: |
| // 1. Averaged fragment instance profiles (TODO: add outliers) |
| // 2. Summary of fragment instance durations (min, max, mean, stddev) |
| // 3. Summary of fragment instance rates (min, max, mean, stddev) |
| // TODO: add histogram/percentile |
| void Coordinator::ReportQuerySummary() { |
| // In this case, the query did not even get to start all fragment instances. |
| // Some of the state that is used below might be uninitialized. In this case, |
| // the query has made so little progress, reporting a summary is not very useful. |
| if (!has_called_wait_) return; |
| |
| if (!fragment_instance_states_.empty()) { |
| // Average all fragment instances for each fragment. |
| for (InstanceState* state: fragment_instance_states_) { |
| state->profile()->ComputeTimeInProfile(); |
| UpdateAverageProfile(state); |
| // Skip coordinator fragment, if one exists. |
| // TODO: Can we remove the special casing here? |
| if (coord_instance_ == nullptr || state->fragment_idx() != 0) { |
| ComputeFragmentSummaryStats(state); |
| } |
| UpdateExecSummary(*state); |
| } |
| |
| InstanceComparator comparator; |
| // Per fragment instances have been collected, output summaries |
| for (int i = (executor() != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) { |
| fragment_profiles_[i].root_profile->SortChildren(comparator); |
| SummaryStats& completion_times = fragment_profiles_[i].completion_times; |
| SummaryStats& rates = fragment_profiles_[i].rates; |
| |
| stringstream times_label; |
| times_label |
| << "min:" << PrettyPrinter::Print( |
| accumulators::min(completion_times), TUnit::TIME_NS) |
| << " max:" << PrettyPrinter::Print( |
| accumulators::max(completion_times), TUnit::TIME_NS) |
| << " mean: " << PrettyPrinter::Print( |
| accumulators::mean(completion_times), TUnit::TIME_NS) |
| << " stddev:" << PrettyPrinter::Print( |
| sqrt(accumulators::variance(completion_times)), TUnit::TIME_NS); |
| |
| stringstream rates_label; |
| rates_label |
| << "min:" << PrettyPrinter::Print( |
| accumulators::min(rates), TUnit::BYTES_PER_SECOND) |
| << " max:" << PrettyPrinter::Print( |
| accumulators::max(rates), TUnit::BYTES_PER_SECOND) |
| << " mean:" << PrettyPrinter::Print( |
| accumulators::mean(rates), TUnit::BYTES_PER_SECOND) |
| << " stddev:" << PrettyPrinter::Print( |
| sqrt(accumulators::variance(rates)), TUnit::BYTES_PER_SECOND); |
| |
| fragment_profiles_[i].averaged_profile->AddInfoString( |
| "completion times", times_label.str()); |
| fragment_profiles_[i].averaged_profile->AddInfoString( |
| "execution rates", rates_label.str()); |
| fragment_profiles_[i].averaged_profile->AddInfoString( |
| "num instances", lexical_cast<string>(fragment_profiles_[i].num_instances)); |
| } |
| |
| // Add per node peak memory usage as InfoString |
| // Map from Impalad address to peak memory usage of this query |
| typedef unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage; |
| PerNodePeakMemoryUsage per_node_peak_mem_usage; |
| for (InstanceState* state: fragment_instance_states_) { |
| int64_t initial_usage = 0; |
| int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage, |
| state->impalad_address(), initial_usage); |
| RuntimeProfile::Counter* mem_usage_counter = |
| state->profile()->GetCounter(PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER); |
| if (mem_usage_counter != NULL && mem_usage_counter->value() > *mem_usage) { |
| per_node_peak_mem_usage[state->impalad_address()] = mem_usage_counter->value(); |
| } |
| } |
| stringstream info; |
| for (PerNodePeakMemoryUsage::value_type entry: per_node_peak_mem_usage) { |
| info << entry.first << "(" |
| << PrettyPrinter::Print(entry.second, TUnit::BYTES) << ") "; |
| } |
| query_profile_->AddInfoString("Per Node Peak Memory Usage", info.str()); |
| } |
| } |
| |
| string Coordinator::GetErrorLog() { |
| ErrorLogMap merged; |
| for (InstanceState* state: fragment_instance_states_) { |
| lock_guard<mutex> l(*state->lock()); |
| if (state->error_log()->size() > 0) MergeErrorMaps(&merged, *state->error_log()); |
| } |
| return PrintErrorMapToString(merged); |
| } |
| |
| void Coordinator::SetExecPlanFragmentParams( |
| const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params) { |
| rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); |
| rpc_params->__set_query_ctx(query_ctx_); |
| |
| TPlanFragmentCtx fragment_ctx; |
| TPlanFragmentInstanceCtx fragment_instance_ctx; |
| |
| fragment_ctx.__set_fragment(params.fragment()); |
| SetExecPlanDescriptorTable(params.fragment(), rpc_params); |
| |
| // Remove filters that weren't selected during filter routing table construction. |
| if (filter_mode_ != TRuntimeFilterMode::OFF) { |
| DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0); |
| int instance_idx = GetInstanceIdx(params.instance_id); |
| for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) { |
| if (plan_node.__isset.runtime_filters) { |
| vector<TRuntimeFilterDesc> required_filters; |
| for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) { |
| FilterRoutingTable::iterator filter_it = |
| filter_routing_table_.find(desc.filter_id); |
| if (filter_it == filter_routing_table_.end()) continue; |
| const FilterState& f = filter_it->second; |
| if (plan_node.__isset.hash_join_node) { |
| if (f.src_fragment_instance_state_idxs().find(instance_idx) == |
| f.src_fragment_instance_state_idxs().end()) { |
| DCHECK(desc.is_broadcast_join); |
| continue; |
| } |
| } |
| // We don't need a target-side check here, because a filter is either sent to |
| // all its targets or none, and the none case is handled by checking if the |
| // filter is in the routing table. |
| required_filters.push_back(desc); |
| } |
| plan_node.__set_runtime_filters(required_filters); |
| } |
| } |
| } |
| |
| fragment_instance_ctx.__set_request_pool(schedule_.request_pool()); |
| fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges); |
| fragment_instance_ctx.__set_per_exch_num_senders( |
| params.fragment_exec_params.per_exch_num_senders); |
| fragment_instance_ctx.__set_destinations( |
| params.fragment_exec_params.destinations); |
| fragment_instance_ctx.__set_sender_id(params.sender_id); |
| fragment_instance_ctx.fragment_instance_id = params.instance_id; |
| fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx; |
| rpc_params->__set_fragment_ctx(fragment_ctx); |
| rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx); |
| } |
| |
| void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment, |
| TExecPlanFragmentParams* rpc_params) { |
| DCHECK(rpc_params->__isset.query_ctx); |
| TDescriptorTable thrift_desc_tbl; |
| |
| // Always add the Tuple and Slot descriptors. |
| thrift_desc_tbl.__set_tupleDescriptors(desc_tbl_.tupleDescriptors); |
| thrift_desc_tbl.__set_slotDescriptors(desc_tbl_.slotDescriptors); |
| |
| // Collect the TTupleId(s) for ScanNode(s). |
| unordered_set<TTupleId> tuple_ids; |
| for (const TPlanNode& plan_node: fragment.plan.nodes) { |
| switch (plan_node.node_type) { |
| case TPlanNodeType::HDFS_SCAN_NODE: |
| tuple_ids.insert(plan_node.hdfs_scan_node.tuple_id); |
| break; |
| case TPlanNodeType::KUDU_SCAN_NODE: |
| tuple_ids.insert(plan_node.kudu_scan_node.tuple_id); |
| break; |
| case TPlanNodeType::HBASE_SCAN_NODE: |
| tuple_ids.insert(plan_node.hbase_scan_node.tuple_id); |
| break; |
| case TPlanNodeType::DATA_SOURCE_NODE: |
| tuple_ids.insert(plan_node.data_source_node.tuple_id); |
| break; |
| case TPlanNodeType::HASH_JOIN_NODE: |
| case TPlanNodeType::AGGREGATION_NODE: |
| case TPlanNodeType::SORT_NODE: |
| case TPlanNodeType::EMPTY_SET_NODE: |
| case TPlanNodeType::EXCHANGE_NODE: |
| case TPlanNodeType::UNION_NODE: |
| case TPlanNodeType::SELECT_NODE: |
| case TPlanNodeType::NESTED_LOOP_JOIN_NODE: |
| case TPlanNodeType::ANALYTIC_EVAL_NODE: |
| case TPlanNodeType::SINGULAR_ROW_SRC_NODE: |
| case TPlanNodeType::UNNEST_NODE: |
| case TPlanNodeType::SUBPLAN_NODE: |
| // Do nothing |
| break; |
| default: |
| DCHECK(false) << "Invalid node type: " << plan_node.node_type; |
| } |
| } |
| |
| // Collect TTableId(s) matching the TTupleId(s). |
| unordered_set<TTableId> table_ids; |
| for (const TTupleId& tuple_id: tuple_ids) { |
| for (const TTupleDescriptor& tuple_desc: desc_tbl_.tupleDescriptors) { |
| if (tuple_desc.__isset.tableId && tuple_id == tuple_desc.id) { |
| table_ids.insert(tuple_desc.tableId); |
| } |
| } |
| } |
| |
| // Collect the tableId for the table sink. |
| if (fragment.__isset.output_sink && fragment.output_sink.__isset.table_sink |
| && fragment.output_sink.type == TDataSinkType::TABLE_SINK) { |
| table_ids.insert(fragment.output_sink.table_sink.target_table_id); |
| } |
| |
| // Iterate over all TTableDescriptor(s) and add the ones that are needed. |
| for (const TTableDescriptor& table_desc: desc_tbl_.tableDescriptors) { |
| if (table_ids.find(table_desc.id) == table_ids.end()) continue; |
| thrift_desc_tbl.tableDescriptors.push_back(table_desc); |
| thrift_desc_tbl.__isset.tableDescriptors = true; |
| } |
| |
| rpc_params->query_ctx.__set_desc_tbl(thrift_desc_tbl); |
| } |
| |
| namespace { |
| |
| // Make a PublishFilter rpc to 'impalad' for given fragment_instance_id |
| // and params. |
| // This takes by-value parameters because we cannot guarantee that the originating |
| // coordinator won't be destroyed while this executes. |
| // TODO: switch to references when we fix the lifecycle problems of coordinators. |
| void DistributeFilters(shared_ptr<TPublishFilterParams> params, |
| TNetworkAddress impalad, TUniqueId fragment_instance_id) { |
| Status status; |
| ImpalaBackendConnection backend_client( |
| ExecEnv::GetInstance()->impalad_client_cache(), impalad, &status); |
| if (!status.ok()) return; |
| // Make a local copy of the shared 'master' set of parameters |
| TPublishFilterParams local_params(*params); |
| local_params.dst_instance_id = fragment_instance_id; |
| local_params.__set_bloom_filter(params->bloom_filter); |
| TPublishFilterResult res; |
| backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res); |
| }; |
| |
| } |
| |
| // TODO: call this as soon as it's clear that we won't reference the state |
| // anymore, ie, in CancelInternal() and when GetNext() hits eos |
| void Coordinator::TearDown() { |
| DCHECK(!torn_down_) << "Coordinator::TearDown() may not be called twice"; |
| torn_down_ = true; |
| if (filter_routing_table_.size() > 0) { |
| query_profile_->AddInfoString("Final filter table", FilterDebugString()); |
| } |
| |
| { |
| lock_guard<SpinLock> l(filter_lock_); |
| for (auto& filter : filter_routing_table_) { |
| FilterState* state = &filter.second; |
| state->Disable(filter_mem_tracker_.get()); |
| } |
| } |
| |
| // Need to protect against failed Prepare(), where root_sink() would not be set. |
| if (coord_sink_ != nullptr) { |
| coord_sink_->CloseConsumer(); |
| coord_sink_ = nullptr; |
| } |
| if (coord_instance_ != nullptr) { |
| ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState( |
| coord_instance_->query_state()); |
| coord_instance_ = nullptr; |
| } |
| } |
| |
| void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { |
| DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) |
| << "UpdateFilter() called although runtime filters are disabled"; |
| DCHECK(exec_complete_barrier_.get() != NULL) |
| << "Filters received before fragments started!"; |
| exec_complete_barrier_->Wait(); |
| DCHECK(filter_routing_table_complete_) |
| << "Filter received before routing table complete"; |
| |
| // Make a 'master' copy that will be shared by all concurrent delivery RPC attempts. |
| shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams()); |
| unordered_set<int> target_fragment_instance_state_idxs; |
| { |
| lock_guard<SpinLock> l(filter_lock_); |
| FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id); |
| if (it == filter_routing_table_.end()) { |
| LOG(INFO) << "Could not find filter with id: " << params.filter_id; |
| return; |
| } |
| FilterState* state = &it->second; |
| |
| DCHECK(state->desc().has_remote_targets) |
| << "Coordinator received filter that has only local targets"; |
| |
| // Check if the filter has already been sent, which could happen in three cases: |
| // * if one local filter had always_true set - no point waiting for other local |
| // filters that can't affect the aggregated global filter |
| // * if this is a broadcast join, and another local filter was already received |
| // * if the filter could not be allocated and so an always_true filter was sent |
| // immediately. |
| if (state->disabled()) return; |
| |
| if (filter_updates_received_->value() == 0) { |
| query_events_->MarkEvent("First dynamic filter received"); |
| } |
| filter_updates_received_->Add(1); |
| |
| state->ApplyUpdate(params, this); |
| |
| if (state->pending_count() > 0 && !state->disabled()) return; |
| // At this point, we either disabled this filter or aggregation is complete. |
| DCHECK(state->disabled() || state->pending_count() == 0); |
| |
| // No more updates are pending on this filter ID. Create a distribution payload and |
| // offer it to the queue. |
| for (const FilterTarget& target: *state->targets()) { |
| // Don't publish the filter to targets that are in the same fragment as the join |
| // that produced it. |
| if (target.is_local) continue; |
| target_fragment_instance_state_idxs.insert( |
| target.fragment_instance_state_idxs.begin(), |
| target.fragment_instance_state_idxs.end()); |
| } |
| |
| // Assign outgoing bloom filter. |
| if (state->bloom_filter() != NULL) { |
| // Complete filter case. |
| // TODO: Replace with move() in Thrift 0.9.3. |
| TBloomFilter* aggregated_filter = state->bloom_filter(); |
| filter_mem_tracker_->Release(aggregated_filter->directory.size()); |
| swap(rpc_params->bloom_filter, *aggregated_filter); |
| DCHECK_EQ(aggregated_filter->directory.size(), 0); |
| } else { |
| // Disabled filter case (due to OOM or due to receiving an always_true filter). |
| rpc_params->bloom_filter.always_true = true; |
| } |
| |
| // Filter is complete, and can be released. |
| state->Disable(filter_mem_tracker_.get()); |
| DCHECK_EQ(state->bloom_filter(), reinterpret_cast<TBloomFilter*>(NULL)); |
| } |
| |
| rpc_params->filter_id = params.filter_id; |
| |
| for (int target_idx: target_fragment_instance_state_idxs) { |
| InstanceState* fragment_inst = fragment_instance_states_[target_idx]; |
| DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << target_idx; |
| exec_env_->rpc_pool()->Offer(bind<void>(DistributeFilters, rpc_params, |
| fragment_inst->impalad_address(), fragment_inst->fragment_instance_id())); |
| // TODO: switch back to the following once we fixed the lifecycle |
| // problems of Coordinator |
| //std::cref(fragment_inst->impalad_address()), |
| //std::cref(fragment_inst->fragment_instance_id()))); |
| } |
| } |
| |
| |
| void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params, |
| Coordinator* coord) { |
| DCHECK_GT(pending_count_, 0); |
| DCHECK_EQ(completion_time_, 0L); |
| if (first_arrival_time_ == 0L) { |
| first_arrival_time_ = coord->query_events_->ElapsedTime(); |
| } |
| |
| --pending_count_; |
| if (params.bloom_filter.always_true) { |
| Disable(coord->filter_mem_tracker_.get()); |
| } else if (bloom_filter_.get() == NULL) { |
| int64_t heap_space = params.bloom_filter.directory.size(); |
| if (!coord->filter_mem_tracker_.get()->TryConsume(heap_space)) { |
| VLOG_QUERY << "Not enough memory to allocate filter: " |
| << PrettyPrinter::Print(heap_space, TUnit::BYTES) |
| << " (query: " << PrintId(coord->query_id()) << ")"; |
| // Disable, as one missing update means a correct filter cannot be produced. |
| Disable(coord->filter_mem_tracker_.get()); |
| } else { |
| bloom_filter_.reset(new TBloomFilter()); |
| // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to |
| // move the payload from the request rather than copy it and take double the memory |
| // cost. After this point, params.bloom_filter is an empty filter and should not be |
| // read. |
| TBloomFilter* non_const_filter = |
| &const_cast<TBloomFilter&>(params.bloom_filter); |
| swap(*bloom_filter_.get(), *non_const_filter); |
| DCHECK_EQ(non_const_filter->directory.size(), 0); |
| } |
| } else { |
| BloomFilter::Or(params.bloom_filter, bloom_filter_.get()); |
| } |
| |
| if (pending_count_ == 0 || disabled_) { |
| completion_time_ = coord->query_events_->ElapsedTime(); |
| } |
| } |
| |
| void Coordinator::FilterState::Disable(MemTracker* tracker) { |
| disabled_ = true; |
| if (bloom_filter_.get() == NULL) return; |
| int64_t heap_space = bloom_filter_.get()->directory.size(); |
| tracker->Release(heap_space); |
| bloom_filter_.reset(); |
| } |
| |
| } |