blob: ec79bd5ca6b02723c0359331a6967de6a4a0b69b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduling/scheduler.h"
#include <stdlib.h>
#include <algorithm>
#include <limits>
#include <random>
#include <unordered_map>
#include <vector>
#include <boost/unordered_set.hpp>
#include <gutil/strings/substitute.h>
#include "common/logging.h"
#include "gen-cpp/CatalogObjects_generated.h"
#include "gen-cpp/DataSinks_types.h"
#include "gen-cpp/ErrorCodes_types.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/Types_types.h"
#include "gen-cpp/common.pb.h"
#include "gen-cpp/statestore_service.pb.h"
#include "scheduling/executor-group.h"
#include "scheduling/hash-ring.h"
#include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
#include "util/compression-util.h"
#include "util/debug-util.h"
#include "util/flat_buffer.h"
#include "util/hash-util.h"
#include "util/metrics.h"
#include "util/network-util.h"
#include "util/pretty-printer.h"
#include "util/runtime-profile-counters.h"
#include "util/uid-util.h"
#include "common/names.h"
using std::pop_heap;
using std::push_heap;
using namespace apache::thrift;
using namespace org::apache::impala::fb;
using namespace strings;
namespace impala {
static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
static const vector<TPlanNodeType::type> SCAN_NODE_TYPES{TPlanNodeType::HDFS_SCAN_NODE,
TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE,
TPlanNodeType::KUDU_SCAN_NODE};
// Consistent scheduling requires picking up to k distinct candidates out of n nodes.
// Since each iteration can pick a node that it already picked (i.e. it is sampling with
// replacement), it may need more than k iterations to pick k distinct candidates.
// There is also no guaranteed bound on the number of iterations. To protect against
// bugs and large numbers of iterations, we limit the number of iterations. This constant
// determines the number of iterations per distinct candidate allowed. Eight iterations
// per distinct candidate provides a very high probability of actually getting k distinct
// candidates. See GetRemoteExecutorCandidates() for a deeper description.
static const int MAX_ITERATIONS_PER_EXECUTOR_CANDIDATE = 8;
Scheduler::Scheduler(MetricGroup* metrics, RequestPoolService* request_pool_service)
: metrics_(metrics->GetOrCreateChildGroup("scheduler")),
request_pool_service_(request_pool_service) {
LOG(INFO) << "Starting scheduler";
if (metrics_ != nullptr) {
total_assignments_ = metrics_->AddCounter(ASSIGNMENTS_KEY, 0);
total_local_assignments_ = metrics_->AddCounter(LOCAL_ASSIGNMENTS_KEY, 0);
initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
}
}
const BackendDescriptorPB& Scheduler::LookUpBackendDesc(
const ExecutorConfig& executor_config, const NetworkAddressPB& host) {
const BackendDescriptorPB* desc = executor_config.group.LookUpBackendDesc(host);
if (desc == nullptr) {
// Local host may not be in executor_config's executor group if it's a dedicated
// coordinator, or if it is configured to be in a different executor group.
const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
DCHECK(host == local_be_desc.address());
desc = &local_be_desc;
}
return *desc;
}
NetworkAddressPB Scheduler::LookUpKrpcHost(
const ExecutorConfig& executor_config, const NetworkAddressPB& backend_host) {
const BackendDescriptorPB& backend_descriptor =
LookUpBackendDesc(executor_config, backend_host);
DCHECK(backend_descriptor.has_krpc_address());
const NetworkAddressPB& krpc_host = backend_descriptor.krpc_address();
DCHECK(IsResolvedAddress(krpc_host));
return krpc_host;
}
Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& specs,
vector<TScanRangeLocationList>* generated_scan_ranges) {
for (const auto& spec : specs) {
// Converts the spec to one or more scan ranges.
const FbFileDesc* fb_desc =
flatbuffers::GetRoot<FbFileDesc>(spec.file_desc.file_desc_data.c_str());
DCHECK(fb_desc->file_blocks() == nullptr || fb_desc->file_blocks()->size() == 0);
long scan_range_offset = 0;
long remaining = fb_desc->length();
long scan_range_length = std::min(spec.max_block_size, fb_desc->length());
if (!spec.is_splittable) scan_range_length = fb_desc->length();
while (remaining > 0) {
THdfsFileSplit hdfs_scan_range;
THdfsCompression::type compression;
RETURN_IF_ERROR(FromFbCompression(fb_desc->compression(), &compression));
hdfs_scan_range.__set_file_compression(compression);
hdfs_scan_range.__set_file_length(fb_desc->length());
hdfs_scan_range.__set_relative_path(fb_desc->relative_path()->str());
hdfs_scan_range.__set_length(scan_range_length);
hdfs_scan_range.__set_mtime(fb_desc->last_modification_time());
hdfs_scan_range.__set_offset(scan_range_offset);
hdfs_scan_range.__set_partition_id(spec.partition_id);
hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec());
hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash);
TScanRange scan_range;
scan_range.__set_hdfs_file_split(hdfs_scan_range);
TScanRangeLocationList scan_range_list;
scan_range_list.__set_scan_range(scan_range);
generated_scan_ranges->push_back(scan_range_list);
scan_range_offset += scan_range_length;
remaining -= scan_range_length;
scan_range_length = (scan_range_length > remaining ? remaining : scan_range_length);
}
}
return Status::OK();
}
Status Scheduler::ComputeScanRangeAssignment(
const ExecutorConfig& executor_config, ScheduleState* state) {
RuntimeProfile::Counter* total_assignment_timer =
ADD_TIMER(state->summary_profile(), "ComputeScanRangeAssignmentTimer");
const TQueryExecRequest& exec_request = state->request();
for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
for (const auto& entry : plan_exec_info.per_node_scan_ranges) {
const TPlanNodeId node_id = entry.first;
const TPlanFragment& fragment = state->GetContainingFragment(node_id);
bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
DCHECK(executor_config.group.NumExecutors() > 0 || exec_at_coord);
const TPlanNode& node = state->GetNode(node_id);
DCHECK_EQ(node.node_id, node_id);
bool has_preference =
node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.replica_preference;
const TReplicaPreference::type* node_replica_preference = has_preference ?
&node.hdfs_scan_node.replica_preference :
nullptr;
bool node_random_replica = node.__isset.hdfs_scan_node
&& node.hdfs_scan_node.__isset.random_replica
&& node.hdfs_scan_node.random_replica;
FragmentScanRangeAssignment* assignment =
&state->GetFragmentScheduleState(fragment.idx)->scan_range_assignment;
const vector<TScanRangeLocationList>* locations = nullptr;
vector<TScanRangeLocationList> expanded_locations;
if (entry.second.split_specs.empty()) {
// directly use the concrete ranges.
locations = &entry.second.concrete_ranges;
} else {
// union concrete ranges and expanded specs.
expanded_locations.insert(expanded_locations.end(),
entry.second.concrete_ranges.begin(), entry.second.concrete_ranges.end());
RETURN_IF_ERROR(
GenerateScanRanges(entry.second.split_specs, &expanded_locations));
locations = &expanded_locations;
}
DCHECK(locations != nullptr);
RETURN_IF_ERROR(
ComputeScanRangeAssignment(executor_config, node_id, node_replica_preference,
node_random_replica, *locations, exec_request.host_list, exec_at_coord,
state->query_options(), total_assignment_timer, state->rng(), assignment));
state->IncNumScanRanges(locations->size());
}
}
return Status::OK();
}
void Scheduler::ComputeFragmentExecParams(
const ExecutorConfig& executor_config, ScheduleState* state) {
const TQueryExecRequest& exec_request = state->request();
// for each plan, compute the FInstanceScheduleStates for the tree of fragments.
// The plans are in dependency order, so we compute parameters for each plan
// *before* its input join build plans. This allows the join build plans to
// be easily co-located with the plans consuming their output.
for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
// set instance_id, host, per_node_scan_ranges
ComputeFragmentExecParams(executor_config, plan_exec_info,
state->GetFragmentScheduleState(plan_exec_info.fragments[0].idx), state);
// Set destinations, per_exch_num_senders, sender_id.
for (const TPlanFragment& src_fragment : plan_exec_info.fragments) {
VLOG(3) << "Computing exec params for fragment " << src_fragment.display_name;
if (!src_fragment.output_sink.__isset.stream_sink) continue;
FragmentIdx dest_idx =
state->GetFragmentIdx(src_fragment.output_sink.stream_sink.dest_node_id);
FragmentScheduleState* dest_state = state->GetFragmentScheduleState(dest_idx);
FragmentScheduleState* src_state =
state->GetFragmentScheduleState(src_fragment.idx);
// populate src_state->destinations
for (int i = 0; i < dest_state->instance_states.size(); ++i) {
PlanFragmentDestinationPB* dest = src_state->exec_params->add_destinations();
*dest->mutable_fragment_instance_id() =
dest_state->instance_states[i].exec_params.instance_id();
const NetworkAddressPB& host = dest_state->instance_states[i].host;
*dest->mutable_thrift_backend() = host;
const BackendDescriptorPB& desc = LookUpBackendDesc(executor_config, host);
DCHECK(desc.has_krpc_address());
DCHECK(IsResolvedAddress(desc.krpc_address()));
*dest->mutable_krpc_backend() = desc.krpc_address();
}
// enumerate senders consecutively;
// for distributed merge we need to enumerate senders across fragment instances
const TDataStreamSink& sink = src_fragment.output_sink.stream_sink;
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
|| sink.output_partition.type == TPartitionType::HASH_PARTITIONED
|| sink.output_partition.type == TPartitionType::RANDOM
|| sink.output_partition.type == TPartitionType::KUDU);
PlanNodeId exch_id = sink.dest_node_id;
google::protobuf::Map<int32_t, int32_t>* per_exch_num_senders =
dest_state->exec_params->mutable_per_exch_num_senders();
int sender_id_base = (*per_exch_num_senders)[exch_id];
(*per_exch_num_senders)[exch_id] += src_state->instance_states.size();
for (int i = 0; i < src_state->instance_states.size(); ++i) {
FInstanceScheduleState& src_instance_state = src_state->instance_states[i];
src_instance_state.exec_params.set_sender_id(sender_id_base + i);
}
}
}
}
void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
const TPlanExecInfo& plan_exec_info, FragmentScheduleState* fragment_state,
ScheduleState* state) {
// Create exec params for child fragments connected by an exchange. Instance creation
// for this fragment depends on where the input fragment instances are scheduled.
for (FragmentIdx input_fragment_idx : fragment_state->exchange_input_fragments) {
ComputeFragmentExecParams(executor_config, plan_exec_info,
state->GetFragmentScheduleState(input_fragment_idx), state);
}
const TPlanFragment& fragment = fragment_state->fragment;
if (fragment.output_sink.__isset.join_build_sink) {
// case 0: join build fragment, co-located with its parent fragment. Join build
// fragments may be unpartitioned if they are co-located with the root fragment.
VLOG(3) << "Computing exec params for collocated join build fragment "
<< fragment_state->fragment.display_name;
CreateCollocatedJoinBuildInstances(fragment_state, state);
} else if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
// case 1: unpartitioned fragment - either the coordinator fragment at the root of
// the plan that must be executed at the coordinator or an unpartitioned fragment
// that can be executed anywhere.
VLOG(3) << "Computing exec params for "
<< (fragment_state->is_coord_fragment ? "coordinator" : "unpartitioned")
<< " fragment " << fragment_state->fragment.display_name;
NetworkAddressPB host;
NetworkAddressPB krpc_host;
if (fragment_state->is_coord_fragment || executor_config.group.NumExecutors() == 0) {
// The coordinator fragment must be scheduled on the coordinator. Otherwise if
// no executors are available, we need to schedule on the coordinator.
const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
host = local_be_desc.address();
DCHECK(local_be_desc.has_krpc_address());
krpc_host = local_be_desc.krpc_address();
} else if (fragment_state->exchange_input_fragments.size() > 0) {
// Interior unpartitioned fragments can be scheduled on an arbitrary executor.
// Pick a random instance from the first input fragment.
const FragmentScheduleState& input_fragment_state =
*state->GetFragmentScheduleState(fragment_state->exchange_input_fragments[0]);
int num_input_instances = input_fragment_state.instance_states.size();
int instance_idx =
std::uniform_int_distribution<int>(0, num_input_instances - 1)(*state->rng());
host = input_fragment_state.instance_states[instance_idx].host;
krpc_host = input_fragment_state.instance_states[instance_idx].krpc_host;
} else {
// Other fragments, e.g. ones with only a constant union or empty set, are scheduled
// on a random executor.
vector<BackendDescriptorPB> all_executors =
executor_config.group.GetAllExecutorDescriptors();
int idx = std::uniform_int_distribution<int>(0, all_executors.size() - 1)(
*state->rng());
const BackendDescriptorPB& be_desc = all_executors[idx];
host = be_desc.address();
DCHECK(be_desc.has_krpc_address());
krpc_host = be_desc.krpc_address();
}
VLOG(3) << "Scheduled unpartitioned fragment on " << krpc_host;
DCHECK(IsResolvedAddress(krpc_host));
// make sure that the coordinator instance ends up with instance idx 0
UniqueIdPB instance_id = fragment_state->is_coord_fragment ?
state->query_id() :
state->GetNextInstanceId();
fragment_state->instance_states.emplace_back(
instance_id, host, krpc_host, 0, *fragment_state);
FInstanceScheduleState& instance_state = fragment_state->instance_states.back();
*fragment_state->exec_params->add_instances() = instance_id;
// That instance gets all of the scan ranges, if there are any.
if (!fragment_state->scan_range_assignment.empty()) {
DCHECK_EQ(fragment_state->scan_range_assignment.size(), 1);
auto first_entry = fragment_state->scan_range_assignment.begin();
for (const PerNodeScanRanges::value_type& entry : first_entry->second) {
instance_state.AddScanRanges(entry.first, entry.second);
}
}
} else if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)
|| ContainsScanNode(fragment.plan)) {
VLOG(3) << "Computing exec params for scan and/or union fragment.";
// case 2: leaf fragment (i.e. no input fragments) with a single scan node.
// case 3: union fragment, which may have scan nodes and may have input fragments.
CreateCollocatedAndScanInstances(executor_config, fragment_state, state);
} else {
VLOG(3) << "Computing exec params for interior fragment.";
// case 4: interior (non-leaf) fragment without a scan or union.
// We assign the same hosts as those of our leftmost input fragment (so that a
// merge aggregation fragment runs on the hosts that provide the input data).
CreateInputCollocatedInstances(fragment_state, state);
}
}
// Maybe the easiest way to understand the objective of this algorithm is as a
// generalization of two simpler instance creation algorithms that decide how many
// instances of a fragment to create on each node, given a set of nodes that were
// already chosen by previous scheduling steps:
// 1. Instance creation for an interior fragment (i.e. a fragment without scans)
// without a UNION, where we create one finstance for each instance of the leftmost
// input fragment.
// 2. Instance creation for a fragment with a single scan and no UNION, where we create
// finstances on each host with a scan range, with one finstance per scan range,
// up to mt_dop finstances.
//
// This algorithm more-or-less creates the union of all finstances that would be created
// by applying the above algorithms to each input fragment and scan node. I.e. the
// parallelism on each host is the max of the parallelism that would result from each
// of the above algorithms. Note that step 1 is modified to run on fragments with union
// nodes, by considering all input fragments and not just the leftmost because we expect
// unions to be symmetrical for purposes of planning, unlike joins.
//
// The high-level steps are:
// 1. Compute the set of hosts that this fragment should run on and the parallelism on
// each host.
// 2. Instantiate finstances on each host.
// a) Map scan ranges to finstances for each scan node with AssignRangesToInstances().
// b) Create the finstances, based on the computed parallelism and assign the scan
// ranges to it.
void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_config,
FragmentScheduleState* fragment_state, ScheduleState* state) {
const TPlanFragment& fragment = fragment_state->fragment;
bool has_union = ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE);
DCHECK(has_union || ContainsScanNode(fragment.plan));
// Build a map of hosts to the num instances this fragment should have, before we take
// into account scan ranges. If this fragment has input fragments, we always run with
// at least the same num instances as the input fragment.
std::unordered_map<NetworkAddressPB, int> instances_per_host;
// Add hosts of input fragments, counting the number of instances of the fragment.
// Only do this if there's a union - otherwise only consider the parallelism of
// the input scan, for consistency with the previous behaviour of only using
// the parallelism of the scan.
if (has_union) {
for (FragmentIdx idx : fragment_state->exchange_input_fragments) {
std::unordered_map<NetworkAddressPB, int> input_fragment_instances_per_host;
const FragmentScheduleState& input_state = *state->GetFragmentScheduleState(idx);
for (const FInstanceScheduleState& instance_state : input_state.instance_states) {
++input_fragment_instances_per_host[instance_state.host];
}
// Merge with the existing hosts by taking the max num instances.
if (instances_per_host.empty()) {
// Optimization for the common case of one input fragment.
instances_per_host = move(input_fragment_instances_per_host);
} else {
for (auto& entry : input_fragment_instances_per_host) {
int& num_instances = instances_per_host[entry.first];
num_instances = max(num_instances, entry.second);
}
}
}
}
// Add hosts of scan nodes.
vector<TPlanNodeId> scan_node_ids = FindScanNodes(fragment.plan);
DCHECK(has_union || scan_node_ids.size() == 1) << "This method may need revisiting "
<< "for plans with no union and multiple scans per fragment";
vector<NetworkAddressPB> scan_hosts;
GetScanHosts(
executor_config.local_be_desc, scan_node_ids, *fragment_state, &scan_hosts);
for (const NetworkAddressPB& host_addr : scan_hosts) {
// Ensure that the num instances is at least as many as input fragments. We don't
// want to increment if there were already some instances from the input fragment,
// since that could result in too high a num_instances.
int& host_num_instances = instances_per_host[host_addr];
host_num_instances = max(1, host_num_instances);
}
DCHECK(!instances_per_host.empty()) << "no hosts for fragment " << fragment.idx;
// Number of instances should be bounded by mt_dop.
int max_num_instances =
max(1, state->request().query_ctx.client_request.query_options.mt_dop);
// Track the index of the next instance to be created for this fragment.
int per_fragment_instance_idx = 0;
for (const auto& entry : instances_per_host) {
const NetworkAddressPB& host = entry.first;
NetworkAddressPB krpc_host = LookUpKrpcHost(executor_config, host);
FragmentScanRangeAssignment& sra = fragment_state->scan_range_assignment;
auto assignment_it = sra.find(host);
// One entry in outer vector per scan node in 'scan_node_ids'.
// The inner vectors are the output of AssignRangesToInstances().
// The vector may be ragged - i.e. different nodes have different numbers
// of instances.
vector<vector<vector<ScanRangeParamsPB>>> per_scan_per_instance_ranges;
for (TPlanNodeId scan_node_id : scan_node_ids) {
// Ensure empty list is created if no scan ranges are scheduled on this host.
per_scan_per_instance_ranges.emplace_back();
if (assignment_it == sra.end()) continue;
auto scan_ranges_it = assignment_it->second.find(scan_node_id);
if (scan_ranges_it == assignment_it->second.end()) continue;
per_scan_per_instance_ranges.back() =
AssignRangesToInstances(max_num_instances, scan_ranges_it->second);
DCHECK_LE(per_scan_per_instance_ranges.back().size(), max_num_instances);
}
// The number of instances to create, based on the scan with the most ranges and
// the input parallelism that we computed for the host above.
int num_instances = entry.second;
DCHECK_GE(num_instances, 1);
DCHECK_LE(num_instances, max_num_instances)
<< "Input parallelism too high for mt_dop";
for (const auto& per_scan_ranges : per_scan_per_instance_ranges) {
num_instances = max(num_instances, static_cast<int>(per_scan_ranges.size()));
}
// Create the finstances. Must do this even if there are no scan range assignments
// because we may have other input fragments.
int host_finstance_start_idx = fragment_state->instance_states.size();
for (int i = 0; i < num_instances; ++i) {
UniqueIdPB instance_id = state->GetNextInstanceId();
fragment_state->instance_states.emplace_back(
instance_id, host, krpc_host, per_fragment_instance_idx++, *fragment_state);
*fragment_state->exec_params->add_instances() = instance_id;
}
// Allocate scan ranges to the finstances if needed. Currently we simply allocate
// them to fragment instances in order. This may be suboptimal in cases where the
// amount of work is somewhat uneven in each scan and we could even out the overall
// amount of work by shuffling the ranges between finstances.
for (int scan_idx = 0; scan_idx < per_scan_per_instance_ranges.size(); ++scan_idx) {
const auto& per_instance_ranges = per_scan_per_instance_ranges[scan_idx];
for (int inst_idx = 0; inst_idx < per_instance_ranges.size(); ++inst_idx) {
FInstanceScheduleState& instance_state =
fragment_state->instance_states[host_finstance_start_idx + inst_idx];
instance_state.AddScanRanges(
scan_node_ids[scan_idx], per_instance_ranges[inst_idx]);
}
}
}
if (fragment.output_sink.__isset.table_sink
&& fragment.output_sink.table_sink.__isset.hdfs_table_sink
&& state->query_options().max_fs_writers > 0
&& fragment_state->instance_states.size() > state->query_options().max_fs_writers) {
LOG(WARNING) << "Extra table sink instances scheduled, probably due to mismatch of "
"cluster state during planning vs scheduling. Expected: "
<< state->query_options().max_fs_writers
<< " Found: " << fragment_state->instance_states.size();
}
}
vector<vector<ScanRangeParamsPB>> Scheduler::AssignRangesToInstances(
int max_num_instances, vector<ScanRangeParamsPB>& ranges) {
DCHECK_GT(max_num_instances, 0);
int num_instances = min(max_num_instances, static_cast<int>(ranges.size()));
vector<vector<ScanRangeParamsPB>> per_instance_ranges(num_instances);
if (num_instances < 2) {
// Short-circuit the assignment algorithm for the single instance case.
per_instance_ranges[0] = ranges;
} else {
int idx = 0;
for (auto& range : ranges) {
per_instance_ranges[idx].push_back(range);
idx = (idx + 1 == num_instances) ? 0 : idx + 1;
}
}
return per_instance_ranges;
}
void Scheduler::CreateInputCollocatedInstances(
FragmentScheduleState* fragment_state, ScheduleState* state) {
DCHECK_GE(fragment_state->exchange_input_fragments.size(), 1);
const TPlanFragment& fragment = fragment_state->fragment;
const FragmentScheduleState& input_fragment_state =
*state->GetFragmentScheduleState(fragment_state->exchange_input_fragments[0]);
int per_fragment_instance_idx = 0;
if (fragment.output_sink.__isset.table_sink
&& fragment.output_sink.table_sink.__isset.hdfs_table_sink
&& state->query_options().max_fs_writers > 0
&& input_fragment_state.instance_states.size()
> state->query_options().max_fs_writers) {
std::unordered_set<std::pair<NetworkAddressPB, NetworkAddressPB>> all_hosts;
for (const FInstanceScheduleState& input_instance_state :
input_fragment_state.instance_states) {
all_hosts.insert({input_instance_state.host, input_instance_state.krpc_host});
}
// This implementation creates the desired number of instances while balancing them
// across hosts and ensuring that instances on the same host get consecutive instance
// indexes.
int num_hosts = all_hosts.size();
int max_instances = state->query_options().max_fs_writers;
int instances_per_host = max_instances / num_hosts;
int remainder = max_instances % num_hosts;
auto host_itr = all_hosts.begin();
for (int i = 0; i < num_hosts; i++) {
for (int j = 0; j < instances_per_host + (i < remainder); ++j) {
UniqueIdPB instance_id = state->GetNextInstanceId();
fragment_state->instance_states.emplace_back(instance_id, host_itr->first,
host_itr->second, per_fragment_instance_idx++, *fragment_state);
*fragment_state->exec_params->add_instances() = instance_id;
}
if (host_itr != all_hosts.end()) host_itr++;
}
} else {
for (const FInstanceScheduleState& input_instance_state :
input_fragment_state.instance_states) {
UniqueIdPB instance_id = state->GetNextInstanceId();
fragment_state->instance_states.emplace_back(instance_id, input_instance_state.host,
input_instance_state.krpc_host, per_fragment_instance_idx++, *fragment_state);
*fragment_state->exec_params->add_instances() = instance_id;
}
}
}
void Scheduler::CreateCollocatedJoinBuildInstances(
FragmentScheduleState* fragment_state, ScheduleState* state) {
const TPlanFragment& fragment = fragment_state->fragment;
DCHECK(fragment.output_sink.__isset.join_build_sink);
const TJoinBuildSink& sink = fragment.output_sink.join_build_sink;
int join_fragment_idx = state->GetFragmentIdx(sink.dest_node_id);
FragmentScheduleState* join_fragment_state =
state->GetFragmentScheduleState(join_fragment_idx);
DCHECK(!join_fragment_state->instance_states.empty())
<< "Parent fragment instances must already be created.";
vector<FInstanceScheduleState>* instance_states = &fragment_state->instance_states;
bool share_build = fragment.output_sink.join_build_sink.share_build;
int per_fragment_instance_idx = 0;
for (FInstanceScheduleState& parent_state : join_fragment_state->instance_states) {
// Share the build if join build sharing is enabled for this fragment and the previous
// instance was on the same host (instances for a backend are clustered together).
if (!share_build || instance_states->empty()
|| instance_states->back().krpc_host != parent_state.krpc_host) {
UniqueIdPB instance_id = state->GetNextInstanceId();
instance_states->emplace_back(instance_id, parent_state.host,
parent_state.krpc_host, per_fragment_instance_idx++, *fragment_state);
instance_states->back().exec_params.set_num_join_build_outputs(0);
*fragment_state->exec_params->add_instances() = instance_id;
}
JoinBuildInputPB build_input;
build_input.set_join_node_id(sink.dest_node_id);
FInstanceExecParamsPB& pb = instance_states->back().exec_params;
*build_input.mutable_input_finstance_id() = pb.instance_id();
*parent_state.exec_params.add_join_build_inputs() = build_input;
VLOG(3) << "Linked join build for node id=" << sink.dest_node_id
<< " build finstance=" << PrintId(pb.instance_id())
<< " dst finstance=" << PrintId(parent_state.exec_params.instance_id());
pb.set_num_join_build_outputs(pb.num_join_build_outputs() + 1);
}
}
Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
bool node_random_replica, const vector<TScanRangeLocationList>& locations,
const vector<TNetworkAddress>& host_list, bool exec_at_coord,
const TQueryOptions& query_options, RuntimeProfile::Counter* timer, std::mt19937* rng,
FragmentScanRangeAssignment* assignment) {
const ExecutorGroup& executor_group = executor_config.group;
if (executor_group.NumExecutors() == 0 && !exec_at_coord) {
return Status(TErrorCode::NO_REGISTERED_BACKENDS);
}
SCOPED_TIMER(timer);
// We adjust all replicas with memory distance less than base_distance to base_distance
// and collect all replicas with equal or better distance as candidates. For a full list
// of memory distance classes see TReplicaPreference in PlanNodes.thrift.
TReplicaPreference::type base_distance = query_options.replica_preference;
// A preference attached to the plan node takes precedence.
if (node_replica_preference) base_distance = *node_replica_preference;
// Between otherwise equivalent executors we optionally break ties by comparing their
// random rank.
bool random_replica = query_options.schedule_random_replica || node_random_replica;
// This temp group is necessary because of the AssignmentCtx interface. This group is
// used to schedule scan ranges for the plan node passed where the caller of this method
// has determined that it needs to be scheduled on the coordinator only. Note that this
// also includes queries where the whole query should run on the coordinator, as is
// determined by Scheduler::IsCoordinatorOnlyQuery(). For those queries, the
// AdmissionController will pass an empty executor group and rely on this method being
// called with exec_at_coord = true.
// TODO: Either get this from the ExecutorConfig or modify the AssignmentCtx interface
// to handle this case.
ExecutorGroup coord_only_executor_group("coordinator-only-group");
const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
coord_only_executor_group.AddExecutor(local_be_desc);
VLOG_ROW << "Exec at coord is " << (exec_at_coord ? "true" : "false");
AssignmentCtx assignment_ctx(exec_at_coord ? coord_only_executor_group : executor_group,
total_assignments_, total_local_assignments_, rng);
// Holds scan ranges that must be assigned for remote reads.
vector<const TScanRangeLocationList*> remote_scan_range_locations;
// Loop over all scan ranges, select an executor for those with local impalads and
// collect all others for later processing.
for (const TScanRangeLocationList& scan_range_locations : locations) {
TReplicaPreference::type min_distance = TReplicaPreference::REMOTE;
// Select executor for the current scan range.
if (exec_at_coord) {
DCHECK(assignment_ctx.executor_group().LookUpExecutorIp(
local_be_desc.address().hostname(), nullptr));
assignment_ctx.RecordScanRangeAssignment(local_be_desc, node_id, host_list,
scan_range_locations, assignment);
} else {
// Collect executor candidates with smallest memory distance.
vector<IpAddr> executor_candidates;
if (base_distance < TReplicaPreference::REMOTE) {
for (const TScanRangeLocation& location : scan_range_locations.locations) {
const TNetworkAddress& replica_host = host_list[location.host_idx];
// Determine the adjusted memory distance to the closest executor for the
// replica host.
TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE;
IpAddr executor_ip;
bool has_local_executor = assignment_ctx.executor_group().LookUpExecutorIp(
replica_host.hostname, &executor_ip);
if (has_local_executor) {
if (location.is_cached) {
memory_distance = TReplicaPreference::CACHE_LOCAL;
} else {
memory_distance = TReplicaPreference::DISK_LOCAL;
}
} else {
memory_distance = TReplicaPreference::REMOTE;
}
memory_distance = max(memory_distance, base_distance);
// We only need to collect executor candidates for non-remote reads, as it is
// the nature of remote reads that there is no executor available.
if (memory_distance < TReplicaPreference::REMOTE) {
DCHECK(has_local_executor);
// Check if we found a closer replica than the previous ones.
if (memory_distance < min_distance) {
min_distance = memory_distance;
executor_candidates.clear();
executor_candidates.push_back(executor_ip);
} else if (memory_distance == min_distance) {
executor_candidates.push_back(executor_ip);
}
}
}
} // End of candidate selection.
DCHECK(!executor_candidates.empty() || min_distance == TReplicaPreference::REMOTE);
// Check the effective memory distance of the candidates to decide whether to treat
// the scan range as cached.
bool cached_replica = min_distance == TReplicaPreference::CACHE_LOCAL;
// Pick executor based on data location.
bool local_executor = min_distance != TReplicaPreference::REMOTE;
if (!local_executor) {
remote_scan_range_locations.push_back(&scan_range_locations);
continue;
}
// For local reads we want to break ties by executor rank in these cases:
// - if it is enforced via a query option.
// - when selecting between cached replicas. In this case there is no OS buffer
// cache to worry about.
// Remote reads will always break ties by executor rank.
bool decide_local_assignment_by_rank = random_replica || cached_replica;
const IpAddr* executor_ip = nullptr;
executor_ip = assignment_ctx.SelectExecutorFromCandidates(
executor_candidates, decide_local_assignment_by_rank);
BackendDescriptorPB executor;
assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor);
assignment_ctx.RecordScanRangeAssignment(
executor, node_id, host_list, scan_range_locations, assignment);
} // End of executor selection.
} // End of for loop over scan ranges.
// Assign remote scans to executors.
int num_remote_executor_candidates =
min(query_options.num_remote_executor_candidates, executor_group.NumExecutors());
for (const TScanRangeLocationList* scan_range_locations : remote_scan_range_locations) {
DCHECK(!exec_at_coord);
const IpAddr* executor_ip;
vector<IpAddr> remote_executor_candidates;
// Limit the number of remote executor candidates:
// 1. When enabled by setting 'num_remote_executor_candidates' > 0
// AND
// 2. This is an HDFS file split
// Otherwise, fall back to the normal method of selecting executors for remote
// ranges, which allows for execution on any backend.
if (scan_range_locations->scan_range.__isset.hdfs_file_split &&
num_remote_executor_candidates > 0) {
assignment_ctx.GetRemoteExecutorCandidates(
&scan_range_locations->scan_range.hdfs_file_split,
num_remote_executor_candidates, &remote_executor_candidates);
// Like the local case, schedule_random_replica determines how to break ties.
executor_ip = assignment_ctx.SelectExecutorFromCandidates(
remote_executor_candidates, random_replica);
} else {
executor_ip = assignment_ctx.SelectRemoteExecutor();
}
BackendDescriptorPB executor;
assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor);
assignment_ctx.RecordScanRangeAssignment(
executor, node_id, host_list, *scan_range_locations, assignment);
}
if (VLOG_FILE_IS_ON) assignment_ctx.PrintAssignment(*assignment);
return Status::OK();
}
bool Scheduler::ContainsNode(const TPlan& plan, TPlanNodeType::type type) {
for (int i = 0; i < plan.nodes.size(); ++i) {
if (plan.nodes[i].node_type == type) return true;
}
return false;
}
bool Scheduler::ContainsNode(
const TPlan& plan, const std::vector<TPlanNodeType::type>& types) {
for (int i = 0; i < plan.nodes.size(); ++i) {
for (int j = 0; j < types.size(); ++j) {
if (plan.nodes[i].node_type == types[j]) return true;
}
}
return false;
}
bool Scheduler::ContainsScanNode(const TPlan& plan) {
return ContainsNode(plan, SCAN_NODE_TYPES);
}
std::vector<TPlanNodeId> Scheduler::FindNodes(
const TPlan& plan, const vector<TPlanNodeType::type>& types) {
vector<TPlanNodeId> results;
for (int i = 0; i < plan.nodes.size(); ++i) {
for (int j = 0; j < types.size(); ++j) {
if (plan.nodes[i].node_type == types[j]) {
results.push_back(plan.nodes[i].node_id);
break;
}
}
}
return results;
}
std::vector<TPlanNodeId> Scheduler::FindScanNodes(const TPlan& plan) {
return FindNodes(plan, SCAN_NODE_TYPES);
}
void Scheduler::GetScanHosts(const BackendDescriptorPB& local_be_desc,
const vector<TPlanNodeId>& scan_ids, const FragmentScheduleState& fragment_state,
vector<NetworkAddressPB>* scan_hosts) {
for (const TPlanNodeId& scan_id : scan_ids) {
// Get the list of impalad host from scan_range_assignment_
for (const FragmentScanRangeAssignment::value_type& scan_range_assignment :
fragment_state.scan_range_assignment) {
const PerNodeScanRanges& per_node_scan_ranges = scan_range_assignment.second;
if (per_node_scan_ranges.find(scan_id) != per_node_scan_ranges.end()) {
scan_hosts->push_back(scan_range_assignment.first);
}
}
if (scan_hosts->empty()) {
// this scan node doesn't have any scan ranges; run it on the coordinator
// TODO: we'll need to revisit this strategy once we can partition joins
// (in which case this fragment might be executing a right outer join
// with a large build table)
scan_hosts->push_back(local_be_desc.address());
}
}
}
Status Scheduler::Schedule(const ExecutorConfig& executor_config, ScheduleState* state) {
RETURN_IF_ERROR(DebugAction(state->query_options(), "SCHEDULER_SCHEDULE"));
RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
ComputeFragmentExecParams(executor_config, state);
ComputeBackendExecParams(executor_config, state);
#ifndef NDEBUG
state->Validate();
#endif
state->set_executor_group(executor_config.group.name());
return Status::OK();
}
bool Scheduler::IsCoordinatorOnlyQuery(const TQueryExecRequest& exec_request) {
DCHECK_GT(exec_request.plan_exec_info.size(), 0);
const TPlanExecInfo& plan_exec_info = exec_request.plan_exec_info[0];
int64_t num_fragments = plan_exec_info.fragments.size();
DCHECK_GT(num_fragments, 0);
auto type = plan_exec_info.fragments[0].partition.type;
return num_fragments == 1 && type == TPartitionType::UNPARTITIONED;
}
void Scheduler::ComputeBackendExecParams(
const ExecutorConfig& executor_config, ScheduleState* state) {
for (FragmentScheduleState& f : state->fragment_schedule_states()) {
const NetworkAddressPB* prev_host = nullptr;
int num_hosts = 0;
for (FInstanceScheduleState& i : f.instance_states) {
BackendScheduleState& be_state = state->GetOrCreateBackendScheduleState(i.host);
be_state.exec_params->add_instance_params()->Swap(&i.exec_params);
// Different fragments do not synchronize their Open() and Close(), so the backend
// does not provide strong guarantees about whether one fragment instance releases
// resources before another acquires them. Conservatively assume that all fragment
// instances on this backend can consume their peak resources at the same time,
// i.e. that this backend's peak resources is the sum of the per-fragment-instance
// peak resources for the instances executing on this backend.
be_state.exec_params->set_min_mem_reservation_bytes(
be_state.exec_params->min_mem_reservation_bytes()
+ f.fragment.instance_min_mem_reservation_bytes);
be_state.exec_params->set_initial_mem_reservation_total_claims(
be_state.exec_params->initial_mem_reservation_total_claims()
+ f.fragment.instance_initial_mem_reservation_total_claims);
be_state.exec_params->set_thread_reservation(
be_state.exec_params->thread_reservation() + f.fragment.thread_reservation);
// Some memory is shared between fragments on a host. Only add it for the first
// instance of this fragment on the host.
if (prev_host == nullptr || *prev_host != i.host) {
be_state.exec_params->set_min_mem_reservation_bytes(
be_state.exec_params->min_mem_reservation_bytes()
+ f.fragment.backend_min_mem_reservation_bytes);
be_state.exec_params->set_initial_mem_reservation_total_claims(
be_state.exec_params->initial_mem_reservation_total_claims()
+ f.fragment.backend_initial_mem_reservation_total_claims);
prev_host = &i.host;
++num_hosts;
}
}
f.exec_params->set_num_hosts(num_hosts);
}
// Compute 'slots_to_use' for each backend based on the max # of instances of
// any fragment on that backend.
for (auto& backend : state->per_backend_schedule_states()) {
int be_max_instances = 0;
// Instances for a fragment are clustered together because of how the vector is
// constructed above. So we can compute the max # of instances of any fragment
// with a single pass over the vector.
int curr_fragment_idx = -1;
int curr_instance_count = 0; // Number of instances of the current fragment seen.
for (auto& finstance : backend.second.exec_params->instance_params()) {
if (curr_fragment_idx == -1 || curr_fragment_idx != finstance.fragment_idx()) {
curr_fragment_idx = finstance.fragment_idx();
curr_instance_count = 0;
}
++curr_instance_count;
be_max_instances = max(be_max_instances, curr_instance_count);
}
backend.second.exec_params->set_slots_to_use(be_max_instances);
}
// This also ensures an entry always exists for the coordinator backend.
int64_t coord_min_reservation = 0;
const NetworkAddressPB& coord_addr = executor_config.local_be_desc.address();
BackendScheduleState& coord_be_state =
state->GetOrCreateBackendScheduleState(coord_addr);
coord_be_state.exec_params->set_is_coord_backend(true);
coord_min_reservation = coord_be_state.exec_params->min_mem_reservation_bytes();
int64_t largest_min_reservation = 0;
for (auto& backend : state->per_backend_schedule_states()) {
const NetworkAddressPB& host = backend.first;
const BackendDescriptorPB be_desc = LookUpBackendDesc(executor_config, host);
backend.second.be_desc = be_desc;
*backend.second.exec_params->mutable_backend_id() = be_desc.backend_id();
*backend.second.exec_params->mutable_address() = be_desc.address();
*backend.second.exec_params->mutable_krpc_address() = be_desc.krpc_address();
if (!backend.second.exec_params->is_coord_backend()) {
largest_min_reservation = max(largest_min_reservation,
backend.second.exec_params->min_mem_reservation_bytes());
}
}
state->set_largest_min_reservation(largest_min_reservation);
state->set_coord_min_reservation(coord_min_reservation);
stringstream min_mem_reservation_ss;
stringstream num_fragment_instances_ss;
for (const auto& e : state->per_backend_schedule_states()) {
min_mem_reservation_ss << NetworkAddressPBToString(e.first) << "("
<< PrettyPrinter::Print(
e.second.exec_params->min_mem_reservation_bytes(),
TUnit::BYTES)
<< ") ";
num_fragment_instances_ss << NetworkAddressPBToString(e.first) << "("
<< PrettyPrinter::Print(
e.second.exec_params->instance_params().size(),
TUnit::UNIT)
<< ") ";
}
state->summary_profile()->AddInfoString(
"Per Host Min Memory Reservation", min_mem_reservation_ss.str());
state->summary_profile()->AddInfoString(
"Per Host Number of Fragment Instances", num_fragment_instances_ss.str());
}
Scheduler::AssignmentCtx::AssignmentCtx(const ExecutorGroup& executor_group,
IntCounter* total_assignments, IntCounter* total_local_assignments, std::mt19937* rng)
: executor_group_(executor_group),
first_unused_executor_idx_(0),
total_assignments_(total_assignments),
total_local_assignments_(total_local_assignments) {
DCHECK_GT(executor_group.NumExecutors(), 0);
random_executor_order_ = executor_group.GetAllExecutorIps();
std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), *rng);
// Initialize inverted map for executor rank lookups
int i = 0;
for (const IpAddr& ip : random_executor_order_) random_executor_rank_[ip] = i++;
}
const IpAddr* Scheduler::AssignmentCtx::SelectExecutorFromCandidates(
const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
DCHECK(!data_locations.empty());
// List of candidate indexes into 'data_locations'.
vector<int> candidates_idxs;
// Find locations with minimum number of assigned bytes.
int64_t min_assigned_bytes = numeric_limits<int64_t>::max();
for (int i = 0; i < data_locations.size(); ++i) {
const IpAddr& executor_ip = data_locations[i];
int64_t assigned_bytes = 0;
auto handle_it = assignment_heap_.find(executor_ip);
if (handle_it != assignment_heap_.end()) {
assigned_bytes = (*handle_it->second).assigned_bytes;
}
if (assigned_bytes < min_assigned_bytes) {
candidates_idxs.clear();
min_assigned_bytes = assigned_bytes;
}
if (assigned_bytes == min_assigned_bytes) candidates_idxs.push_back(i);
}
DCHECK(!candidates_idxs.empty());
auto min_rank_idx = candidates_idxs.begin();
if (break_ties_by_rank) {
min_rank_idx = min_element(candidates_idxs.begin(), candidates_idxs.end(),
[&data_locations, this](const int& a, const int& b) {
return GetExecutorRank(data_locations[a]) < GetExecutorRank(data_locations[b]);
});
}
return &data_locations[*min_rank_idx];
}
void Scheduler::AssignmentCtx::GetRemoteExecutorCandidates(
const THdfsFileSplit* hdfs_file_split, int num_candidates,
vector<IpAddr>* remote_executor_candidates) {
// This should be given an empty vector
DCHECK_EQ(remote_executor_candidates->size(), 0);
// This function should not be called with 'num_candidates' exceeding the number of
// executors.
DCHECK_LE(num_candidates, executor_group_.NumExecutors());
// Two different hashes of the filename can result in the same executor.
// The function should return distinct executors, so it may need to do more hashes
// than 'num_candidates'.
unordered_set<IpAddr> distinct_backends;
distinct_backends.reserve(num_candidates);
// Generate multiple hashes of the file split by using the hash as a seed to a PRNG.
// Note: The hash includes the partition path hash, the filename (relative to the
// partition directory), and the offset. The offset is used to allow very large files
// that have multiple splits to be spread across more executors.
uint32_t hash = static_cast<uint32_t>(hdfs_file_split->partition_path_hash);
hash = HashUtil::Hash(hdfs_file_split->relative_path.data(),
hdfs_file_split->relative_path.length(), hash);
hash = HashUtil::Hash(&hdfs_file_split->offset, sizeof(hdfs_file_split->offset), hash);
pcg32 prng(hash);
// The function should return distinct executors, so it may need to do more hashes
// than 'num_candidates'. To avoid any problem scenarios, limit the total number of
// iterations. The number of iterations is set to a reasonably high level, because
// on average the loop short circuits considerably earlier. Using a higher number of
// iterations is useful for smaller clusters where we are using this function to get
// all the backends in a consistent order rather than picking a consistent subset.
// Suppose there are three nodes and the number of remote executor candidates is three.
// One can calculate the probability of picking three distinct executors in at most
// n iterations. For n=3, the second pick must not overlap the first (probability 2/3),
// and the third pick must not be either the first or second (probability 1/3). So:
// P(3) = 1*(2/3)*(1/3)=2/9
// The probability that it is done in at most n+1 steps is the probability that
// it completed in n steps combined with the probability that it completes in the n+1st
// step. In order to complete in the n+1st step, the previous n steps must not have
// all landed on a single backend (probability (1/3)^(n-1)) and this step must not land
// on the two backends already chosen (probability 1/3). So, the recursive step is:
// P(n+1) = P(n) + (1 - P(n))*(1-(1/3)^(n-1))*(1/3)
// Here are some example probabilities:
// Probability of completing in at most 5 iterations: 0.6284
// Probability of completing in at most 10 iterations: 0.9506
// Probability of completing in at most 15 iterations: 0.9935
// Probability of completing in at most 20 iterations: 0.9991
int max_iterations = num_candidates * MAX_ITERATIONS_PER_EXECUTOR_CANDIDATE;
for (int i = 0; i < max_iterations; ++i) {
// Look up nearest IpAddr
const IpAddr* executor_addr = executor_group_.GetHashRing()->GetNode(prng());
DCHECK(executor_addr != nullptr);
auto insert_ret = distinct_backends.insert(*executor_addr);
// The return type of unordered_set.insert() is a pair<iterator, bool> where the
// second element is whether this was a new element. If this is a new element,
// add this element to the return vector.
if (insert_ret.second) {
remote_executor_candidates->push_back(*executor_addr);
}
// Short-circuit if we reach the appropriate number of replicas
if (remote_executor_candidates->size() == num_candidates) break;
}
}
const IpAddr* Scheduler::AssignmentCtx::SelectRemoteExecutor() {
const IpAddr* candidate_ip;
if (HasUnusedExecutors()) {
// Pick next unused executor.
candidate_ip = GetNextUnusedExecutorAndIncrement();
} else {
// Pick next executor from assignment_heap. All executors must have been inserted into
// the heap at this point.
DCHECK_GT(executor_group_.NumHosts(), 0);
DCHECK_EQ(executor_group_.NumHosts(), assignment_heap_.size());
candidate_ip = &(assignment_heap_.top().ip);
}
DCHECK(candidate_ip != nullptr);
return candidate_ip;
}
bool Scheduler::AssignmentCtx::HasUnusedExecutors() const {
return first_unused_executor_idx_ < random_executor_order_.size();
}
const IpAddr* Scheduler::AssignmentCtx::GetNextUnusedExecutorAndIncrement() {
DCHECK(HasUnusedExecutors());
const IpAddr* ip = &random_executor_order_[first_unused_executor_idx_++];
return ip;
}
int Scheduler::AssignmentCtx::GetExecutorRank(const IpAddr& ip) const {
auto it = random_executor_rank_.find(ip);
DCHECK(it != random_executor_rank_.end());
return it->second;
}
void Scheduler::AssignmentCtx::SelectExecutorOnHost(
const IpAddr& executor_ip, BackendDescriptorPB* executor) {
DCHECK(executor_group_.LookUpExecutorIp(executor_ip, nullptr));
const ExecutorGroup::Executors& executors_on_host =
executor_group_.GetExecutorsForHost(executor_ip);
DCHECK(executors_on_host.size() > 0);
if (executors_on_host.size() == 1) {
*executor = *executors_on_host.begin();
} else {
ExecutorGroup::Executors::const_iterator* next_executor_on_host;
next_executor_on_host =
FindOrInsert(&next_executor_per_host_, executor_ip, executors_on_host.begin());
auto eq = [next_executor_on_host](auto& elem) {
const BackendDescriptorPB& next_executor = **next_executor_on_host;
// The IP addresses must already match, so it is sufficient to check the port.
DCHECK_EQ(next_executor.ip_address(), elem.ip_address());
return next_executor.address().port() == elem.address().port();
};
DCHECK(find_if(executors_on_host.begin(), executors_on_host.end(), eq)
!= executors_on_host.end());
*executor = **next_executor_on_host;
// Rotate
++(*next_executor_on_host);
if (*next_executor_on_host == executors_on_host.end()) {
*next_executor_on_host = executors_on_host.begin();
}
}
}
void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_range_pb) {
if (tscan_range.__isset.hdfs_file_split) {
HdfsFileSplitPB* hdfs_file_split = scan_range_pb->mutable_hdfs_file_split();
hdfs_file_split->set_relative_path(tscan_range.hdfs_file_split.relative_path);
hdfs_file_split->set_offset(tscan_range.hdfs_file_split.offset);
hdfs_file_split->set_length(tscan_range.hdfs_file_split.length);
hdfs_file_split->set_partition_id(tscan_range.hdfs_file_split.partition_id);
hdfs_file_split->set_file_length(tscan_range.hdfs_file_split.file_length);
hdfs_file_split->set_file_compression(
THdfsCompressionToProto(tscan_range.hdfs_file_split.file_compression));
hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
hdfs_file_split->set_partition_path_hash(
tscan_range.hdfs_file_split.partition_path_hash);
}
if (tscan_range.__isset.hbase_key_range) {
HBaseKeyRangePB* hbase_key_range = scan_range_pb->mutable_hbase_key_range();
hbase_key_range->set_startkey(tscan_range.hbase_key_range.startKey);
hbase_key_range->set_stopkey(tscan_range.hbase_key_range.stopKey);
}
if (tscan_range.__isset.kudu_scan_token) {
scan_range_pb->set_kudu_scan_token(tscan_range.kudu_scan_token);
}
}
void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
const BackendDescriptorPB& executor, PlanNodeId node_id,
const vector<TNetworkAddress>& host_list,
const TScanRangeLocationList& scan_range_locations,
FragmentScanRangeAssignment* assignment) {
int64_t scan_range_length = 0;
if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
} else if (scan_range_locations.scan_range.__isset.kudu_scan_token) {
// Hack so that kudu ranges are well distributed.
// TODO: KUDU-1133 Use the tablet size instead.
scan_range_length = 1000;
}
IpAddr executor_ip;
bool ret =
executor_group_.LookUpExecutorIp(executor.address().hostname(), &executor_ip);
DCHECK(ret);
DCHECK(!executor_ip.empty());
assignment_heap_.InsertOrUpdate(
executor_ip, scan_range_length, GetExecutorRank(executor_ip));
// See if the read will be remote. This is not the case if the impalad runs on one of
// the replica's datanodes.
bool remote_read = true;
// For local reads we can set volume_id and try_hdfs_cache. For remote reads HDFS will
// decide which replica to use so we keep those at default values.
int volume_id = -1;
bool try_hdfs_cache = false;
for (const TScanRangeLocation& location : scan_range_locations.locations) {
const TNetworkAddress& replica_host = host_list[location.host_idx];
IpAddr replica_ip;
if (executor_group_.LookUpExecutorIp(replica_host.hostname, &replica_ip)
&& executor_ip == replica_ip) {
remote_read = false;
volume_id = location.volume_id;
try_hdfs_cache = location.is_cached;
break;
}
}
if (remote_read) {
assignment_byte_counters_.remote_bytes += scan_range_length;
} else {
assignment_byte_counters_.local_bytes += scan_range_length;
if (try_hdfs_cache) assignment_byte_counters_.cached_bytes += scan_range_length;
}
if (total_assignments_ != nullptr) {
DCHECK(total_local_assignments_ != nullptr);
total_assignments_->Increment(1);
if (!remote_read) total_local_assignments_->Increment(1);
}
PerNodeScanRanges* scan_ranges =
FindOrInsert(assignment, executor.address(), PerNodeScanRanges());
vector<ScanRangeParamsPB>* scan_range_params_list =
FindOrInsert(scan_ranges, node_id, vector<ScanRangeParamsPB>());
// Add scan range.
ScanRangeParamsPB scan_range_params;
TScanRangeToScanRangePB(
scan_range_locations.scan_range, scan_range_params.mutable_scan_range());
scan_range_params.set_volume_id(volume_id);
scan_range_params.set_try_hdfs_cache(try_hdfs_cache);
scan_range_params.set_is_remote(remote_read);
scan_range_params_list->push_back(scan_range_params);
if (VLOG_FILE_IS_ON) {
VLOG_FILE << "Scheduler assignment to executor: " << executor.address() << "("
<< (remote_read ? "remote" : "local") << " selection)";
}
}
void Scheduler::AssignmentCtx::PrintAssignment(
const FragmentScanRangeAssignment& assignment) {
VLOG_FILE << "Total remote scan volume = "
<< PrettyPrinter::Print(assignment_byte_counters_.remote_bytes, TUnit::BYTES);
VLOG_FILE << "Total local scan volume = "
<< PrettyPrinter::Print(assignment_byte_counters_.local_bytes, TUnit::BYTES);
VLOG_FILE << "Total cached scan volume = "
<< PrettyPrinter::Print(assignment_byte_counters_.cached_bytes, TUnit::BYTES);
for (const FragmentScanRangeAssignment::value_type& entry : assignment) {
VLOG_FILE << "ScanRangeAssignment: server=" << entry.first.DebugString();
for (const PerNodeScanRanges::value_type& per_node_scan_ranges : entry.second) {
stringstream str;
for (const ScanRangeParamsPB& params : per_node_scan_ranges.second) {
str << params.DebugString() << " ";
}
VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
}
}
}
void Scheduler::AddressableAssignmentHeap::InsertOrUpdate(
const IpAddr& ip, int64_t assigned_bytes, int rank) {
auto handle_it = executor_handles_.find(ip);
if (handle_it == executor_handles_.end()) {
AssignmentHeap::handle_type handle = executor_heap_.push({assigned_bytes, rank, ip});
executor_handles_.emplace(ip, handle);
} else {
// We need to rebuild the heap after every update operation. Calling decrease once is
// sufficient as both assignments decrease the key.
AssignmentHeap::handle_type handle = handle_it->second;
(*handle).assigned_bytes += assigned_bytes;
executor_heap_.decrease(handle);
}
}
}