blob: 2339007d26437760d532e7c818eb8e1792270f6e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduling/simple-scheduler.h"
#include <atomic>
#include <random>
#include <vector>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/bind.hpp>
#include <boost/mem_fn.hpp>
#include <gutil/strings/substitute.h>
#include "common/logging.h"
#include "util/metrics.h"
#include "resourcebroker/resource-broker.h"
#include "runtime/exec-env.h"
#include "runtime/coordinator.h"
#include "service/impala-server.h"
#include "statestore/statestore-subscriber.h"
#include "gen-cpp/Types_types.h"
#include "gen-cpp/ImpalaInternalService_constants.h"
#include "util/network-util.h"
#include "util/uid-util.h"
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/error-util.h"
#include "util/llama-util.h"
#include "util/mem-info.h"
#include "util/parse-util.h"
#include "util/runtime-profile-counters.h"
#include "gen-cpp/ResourceBrokerService_types.h"
#include "common/names.h"
using boost::algorithm::join;
using namespace apache::thrift;
using namespace rapidjson;
using namespace strings;
DEFINE_bool(disable_admission_control, false, "Disables admission control.");
namespace impala {
static const string LOCAL_ASSIGNMENTS_KEY("");
static const string ASSIGNMENTS_KEY("");
static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
static const string BACKENDS_WEB_PAGE = "/backends";
static const string BACKENDS_TEMPLATE = "backends.tmpl";
const string SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
const string& backend_id, const TNetworkAddress& backend_address,
MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
RequestPoolService* request_pool_service)
: backend_config_(std::make_shared<const BackendConfig>()),
request_pool_service_(request_pool_service) {
local_backend_descriptor_.address = backend_address;
if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
if (!FLAGS_disable_admission_control) {
new AdmissionController(request_pool_service_, metrics, backend_address));
if (FLAGS_enable_rm) {
if (FLAGS_rm_default_cpu_vcores <= 0) {
LOG(ERROR) << "Bad value for --rm_default_cpu_vcores (must be postive): "
<< FLAGS_rm_default_cpu_vcores;
bool is_percent;
int64_t mem_bytes =
FLAGS_rm_default_memory, &is_percent, MemInfo::physical_mem());
if (mem_bytes <= 1024 * 1024) {
LOG(ERROR) << "Bad value for --rm_default_memory (must be larger than 1M):"
<< FLAGS_rm_default_memory;
} else if (is_percent) {
LOG(ERROR) << "Must use absolute value for --rm_default_memory: "
<< FLAGS_rm_default_memory;
SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends,
MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
RequestPoolService* request_pool_service)
: backend_config_(std::make_shared<const BackendConfig>(backends)),
request_pool_service_(request_pool_service) {
DCHECK(backends.size() > 0);
local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
// request_pool_service_ may be null in unit tests
if (request_pool_service_ != NULL && !FLAGS_disable_admission_control) {
new AdmissionController(request_pool_service_, metrics, TNetworkAddress()));
Status SimpleScheduler::Init() {
LOG(INFO) << "Starting simple scheduler";
// Figure out what our IP address is, so that each subscriber
// doesn't have to resolve it on every heartbeat.
IpAddr ip;
const Hostname& hostname = local_backend_descriptor_.address.hostname;
Status status = HostnameToIpAddr(hostname, &ip);
if (!status.ok()) {
VLOG(1) << status.GetDetail();
status.AddDetail("SimpleScheduler failed to start");
return status;
local_backend_descriptor_.ip_address = ip;
LOG(INFO) << "Simple-scheduler using " << ip << " as IP address";
if (webserver_ != NULL) {
Webserver::UrlCallback backends_callback =
bind<void>(mem_fn(&SimpleScheduler::BackendsUrlCallback), this, _1, _2);
webserver_->RegisterUrlCallback(BACKENDS_WEB_PAGE, BACKENDS_TEMPLATE,
if (statestore_subscriber_ != NULL) {
StatestoreSubscriber::UpdateCallback cb =
bind<void>(mem_fn(&SimpleScheduler::UpdateMembership), this, _1, _2);
Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC, true, cb);
if (!status.ok()) {
status.AddDetail("SimpleScheduler failed to register membership topic");
return status;
if (!FLAGS_disable_admission_control) {
if (metrics_ != NULL) {
// This is after registering with the statestored, so we already have to synchronize
// access to the backend_config_ shared_ptr.
int num_backends = GetBackendConfig()->backend_map().size();
total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
num_fragment_instances_metric_ = metrics_->AddGauge<int64_t>(
NUM_BACKENDS_KEY, num_backends);
if (statestore_subscriber_ != NULL) {
if (webserver_ != NULL) {
const TNetworkAddress& webserver_address = webserver_->http_address();
if (IsWildcardAddress(webserver_address.hostname)) {
MakeNetworkAddress(ip, webserver_address.port));
} else {
return Status::OK();
void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args,
Document* document) {
BackendList backends;
Value backends_list(kArrayType);
for (const BackendList::value_type& backend: backends) {
Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator());
backends_list.PushBack(str, document->GetAllocator());
document->AddMember("backends", backends_list, document->GetAllocator());
void SimpleScheduler::UpdateMembership(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
vector<TTopicDelta>* subscriber_topic_updates) {
// First look to see if the topic(s) we're interested in have an update
StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
if (topic != incoming_topic_deltas.end()) {
const TTopicDelta& delta = topic->second;
// This function needs to handle both delta and non-delta updates. To minimize the
// time needed to hold locks, all updates are applied to a copy of backend_config_,
// which is then swapped into place atomically.
std::shared_ptr<BackendConfig> new_backend_config;
if (!delta.is_delta) {
new_backend_config = std::make_shared<BackendConfig>();
} else {
// Make a copy
lock_guard<mutex> lock(backend_config_lock_);
new_backend_config = std::make_shared<BackendConfig>(*backend_config_);
// Process new entries to the topic
for (const TTopicItem& item: delta.topic_entries) {
TBackendDescriptor be_desc;
// Benchmarks have suggested that this method can deserialize
// ~10m messages per second, so no immediate need to consider optimization.
uint32_t len = item.value.size();
Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(, &len, false, &be_desc);
if (!status.ok()) {
VLOG(2) << "Error deserializing membership topic item with key: " << item.key;
if (item.key == local_backend_id_
&& be_desc.address != local_backend_descriptor_.address) {
// Someone else has registered this subscriber ID with a different address. We
// will try to re-register (i.e. overwrite their subscription), but there is
// likely a configuration problem.
LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
<< be_desc.address;
current_membership_.insert(make_pair(item.key, be_desc));
// Process deletions from the topic
for (const string& backend_id: delta.topic_deletions) {
if (current_membership_.find(backend_id) != current_membership_.end()) {
// If this impalad is not in our view of the membership list, we should add it and
// tell the statestore.
bool is_offline = ExecEnv::GetInstance() &&
if (!is_offline &&
current_membership_.find(local_backend_id_) == current_membership_.end()) {
VLOG(1) << "Registering local backend with statestore";
TTopicDelta& update = subscriber_topic_updates->back();
update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
TTopicItem& item = update.topic_entries.back();
item.key = local_backend_id_;
Status status = thrift_serializer_.Serialize(
&local_backend_descriptor_, &item.value);
if (!status.ok()) {
LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic:"
<< " " << status.GetDetail();
} else if (is_offline &&
current_membership_.find(local_backend_id_) != current_membership_.end()) {
LOG(WARNING) << "Removing offline ImpalaServer from statestore";
TTopicDelta& update = subscriber_topic_updates->back();
update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
if (metrics_ != NULL) {
SimpleScheduler::BackendConfigPtr SimpleScheduler::GetBackendConfig() const {
lock_guard<mutex> l(backend_config_lock_);
DCHECK(backend_config_.get() != NULL);
BackendConfigPtr backend_config = backend_config_;
return backend_config;
void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config)
lock_guard<mutex> l(backend_config_lock_);
backend_config_ = backend_config;
void SimpleScheduler::GetAllKnownBackends(BackendList* backends) {
BackendConfigPtr backend_config = GetBackendConfig();
for (const BackendMap::value_type& backend_list: backend_config->backend_map()) {
backends->insert(backends->end(), backend_list.second.begin(),
Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
QuerySchedule* schedule) {
map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry;
RuntimeProfile::Counter* total_assignment_timer =
ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer");
BackendConfigPtr backend_config = GetBackendConfig();
for (entry = exec_request.per_node_scan_ranges.begin();
entry != exec_request.per_node_scan_ranges.end(); ++entry) {
const TPlanNodeId node_id = entry->first;
int fragment_idx = schedule->GetFragmentIdx(node_id);
const TPlanFragment& fragment = exec_request.fragments[fragment_idx];
bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
const TPlanNode& node = fragment.plan.nodes[schedule->GetNodeIdx(node_id)];
DCHECK_EQ(node.node_id, node_id);
const TReplicaPreference::type* node_replica_preference = node.__isset.hdfs_scan_node
&& node.hdfs_scan_node.__isset.replica_preference
? &node.hdfs_scan_node.replica_preference : NULL;
bool node_random_replica = node.__isset.hdfs_scan_node
&& node.hdfs_scan_node.__isset.random_replica
&& node.hdfs_scan_node.random_replica;
FragmentScanRangeAssignment* assignment =
node_id, node_replica_preference, node_random_replica, entry->second,
exec_request.host_list, exec_at_coord, schedule->query_options(),
total_assignment_timer, assignment));
return Status::OK();
Status SimpleScheduler::ComputeScanRangeAssignment(
const BackendConfig& backend_config, PlanNodeId node_id,
const TReplicaPreference::type* node_replica_preference, bool node_random_replica,
const vector<TScanRangeLocations>& locations,
const vector<TNetworkAddress>& host_list, bool exec_at_coord,
const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
FragmentScanRangeAssignment* assignment) {
// We adjust all replicas with memory distance less than base_distance to base_distance
// and collect all replicas with equal or better distance as candidates. For a full list
// of memory distance classes see TReplicaPreference in PlanNodes.thrift.
TReplicaPreference::type base_distance = query_options.replica_preference;
// The query option to disable cached reads adjusts the memory base distance to view
// all replicas as having a distance disk_local or worse.
if (query_options.disable_cached_reads &&
base_distance == TReplicaPreference::CACHE_LOCAL) {
base_distance = TReplicaPreference::DISK_LOCAL;
// A preference attached to the plan node takes precedence.
if (node_replica_preference) base_distance = *node_replica_preference;
// Between otherwise equivalent backends we optionally break ties by comparing their
// random rank.
bool random_replica = query_options.schedule_random_replica || node_random_replica;
AssignmentCtx assignment_ctx(backend_config, total_assignments_,
vector<const TScanRangeLocations*> remote_scan_range_locations;
// Loop over all scan ranges, select a backend for those with local impalads and collect
// all others for later processing.
for (const TScanRangeLocations& scan_range_locations: locations) {
TReplicaPreference::type min_distance = TReplicaPreference::REMOTE;
// Select backend host for the current scan range.
if (exec_at_coord) {
assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, node_id,
host_list, scan_range_locations, assignment);
} else {
// Collect backend candidates with smallest memory distance.
vector<IpAddr> backend_candidates;
if (base_distance < TReplicaPreference::REMOTE) {
for (const TScanRangeLocation& location: scan_range_locations.locations) {
const TNetworkAddress& replica_host = host_list[location.host_idx];
// Determine the adjusted memory distance to the closest backend for the replica
// host.
TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE;
IpAddr backend_ip;
bool has_local_backend = assignment_ctx.backend_config().LookUpBackendIp(
replica_host.hostname, &backend_ip);
if (has_local_backend) {
if (location.is_cached) {
memory_distance = TReplicaPreference::CACHE_LOCAL;
} else {
memory_distance = TReplicaPreference::DISK_LOCAL;
} else {
memory_distance = TReplicaPreference::REMOTE;
memory_distance = max(memory_distance, base_distance);
// We only need to collect backend candidates for non-remote reads, as it is the
// nature of remote reads that there is no backend available.
if (memory_distance < TReplicaPreference::REMOTE) {
// Check if we found a closer replica than the previous ones.
if (memory_distance < min_distance) {
min_distance = memory_distance;
} else if (memory_distance == min_distance) {
} // End of candidate selection.
DCHECK(!backend_candidates.empty() || min_distance == TReplicaPreference::REMOTE);
// Check the effective memory distance of the candidates to decide whether to treat
// the scan range as cached.
bool cached_replica = min_distance == TReplicaPreference::CACHE_LOCAL;
// Pick backend host based on data location.
bool local_backend = min_distance != TReplicaPreference::REMOTE;
if (!local_backend) {
// For local reads we want to break ties by backend rank in these cases:
// - if it is enforced via a query option.
// - when selecting between cached replicas. In this case there is no OS buffer
// cache to worry about.
// Remote reads will always break ties by backend rank.
bool decide_local_assignment_by_rank = random_replica || cached_replica;
const IpAddr* backend_ip = NULL;
backend_ip = assignment_ctx.SelectLocalBackendHost(backend_candidates,
TBackendDescriptor backend;
assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
assignment_ctx.RecordScanRangeAssignment(backend, node_id, host_list,
scan_range_locations, assignment);
} // End of backend host selection.
} // End of for loop over scan ranges.
// Assign remote scans to backends.
for (const TScanRangeLocations* scan_range_locations: remote_scan_range_locations) {
const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost();
TBackendDescriptor backend;
assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
assignment_ctx.RecordScanRangeAssignment(backend, node_id, host_list,
*scan_range_locations, assignment);
if (VLOG_FILE_IS_ON) assignment_ctx.PrintAssignment(*assignment);
return Status::OK();
void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
QuerySchedule* schedule) {
vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
// assign instance ids
int64_t num_fragment_instances = 0;
for (FragmentExecParams& params: *fragment_exec_params) {
for (int j = 0; j < params.hosts.size(); ++j) {
int instance_num = num_fragment_instances + j;
// we add instance_num to query_id.lo to create a globally-unique instance id
TUniqueId instance_id;
instance_id.hi = schedule->query_id().hi;
schedule->query_id().lo, numeric_limits<int64_t>::max() - instance_num - 1);
instance_id.lo = schedule->query_id().lo + instance_num + 1;
num_fragment_instances += params.hosts.size();
if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
// the root fragment is executed directly by the coordinator
// compute destinations and # senders per exchange node
// (the root fragment doesn't have a destination)
for (int i = 1; i < fragment_exec_params->size(); ++i) {
FragmentExecParams& params = (*fragment_exec_params)[i];
int dest_fragment_idx = exec_request.dest_fragment_idx[i - 1];
DCHECK_LT(dest_fragment_idx, fragment_exec_params->size());
FragmentExecParams& dest_params = (*fragment_exec_params)[dest_fragment_idx];
// set # of senders
const TDataStreamSink& sink = exec_request.fragments[i].output_sink.stream_sink;
// we can only handle unpartitioned (= broadcast), random-partitioned or
// hash-partitioned output at the moment
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
|| sink.output_partition.type == TPartitionType::HASH_PARTITIONED
|| sink.output_partition.type == TPartitionType::RANDOM);
PlanNodeId exch_id = sink.dest_node_id;
// we might have multiple fragments sending to this exchange node
// (distributed MERGE), which is why we need to add up the #senders
params.sender_id_base = dest_params.per_exch_num_senders[exch_id];
dest_params.per_exch_num_senders[exch_id] += params.hosts.size();
// create one TPlanFragmentDestination per destination host
for (int j = 0; j < dest_params.hosts.size(); ++j) {
TPlanFragmentDestination& dest = params.destinations[j];
dest.fragment_instance_id = dest_params.instance_ids[j];
dest.server = dest_params.hosts[j];
VLOG_RPC << "dest for fragment " << i << ":"
<< " instance_id=" << dest.fragment_instance_id
<< " server=" << dest.server;
void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request,
QuerySchedule* schedule) {
vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
vector<TPlanNodeType::type> scan_node_types;
// compute hosts of producer fragment before those of consumer fragment(s),
// the latter might inherit the set of hosts from the former
for (int i = exec_request.fragments.size() - 1; i >= 0; --i) {
const TPlanFragment& fragment = exec_request.fragments[i];
FragmentExecParams& params = (*fragment_exec_params)[i];
if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
// all single-node fragments run on the coordinator host
// UnionNodes are special because they can consume multiple partitioned inputs,
// as well as execute multiple scans in the same fragment.
// Fragments containing a UnionNode are executed on the union of hosts of all
// scans in the fragment as well as the hosts of all its input fragments (s.t.
// a UnionNode with partitioned joins or grouping aggregates as children runs on
// at least as many hosts as the input to those children).
if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
vector<TPlanNodeId> scan_nodes;
FindNodes(fragment.plan, scan_node_types, &scan_nodes);
vector<TPlanNodeId> exch_nodes;
vector<TPlanNodeType::type>(1, TPlanNodeType::EXCHANGE_NODE),
// Add hosts of scan nodes.
vector<TNetworkAddress> scan_hosts;
for (int j = 0; j < scan_nodes.size(); ++j) {
GetScanHosts(scan_nodes[j], exec_request, params, &scan_hosts);
unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), scan_hosts.end());
// Add hosts of input fragments.
for (int j = 0; j < exch_nodes.size(); ++j) {
int input_fragment_idx = FindSenderFragment(exch_nodes[j], i, exec_request);
const vector<TNetworkAddress>& input_fragment_hosts =
hosts.insert(input_fragment_hosts.begin(), input_fragment_hosts.end());
DCHECK(!hosts.empty()) << "no hosts for fragment " << i << " with a UnionNode";
params.hosts.assign(hosts.begin(), hosts.end());
PlanNodeId leftmost_scan_id = FindLeftmostNode(fragment.plan, scan_node_types);
if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
// there is no leftmost scan; we assign the same hosts as those of our
// leftmost input fragment (so that a partitioned aggregation fragment
// runs on the hosts that provide the input data)
int input_fragment_idx = FindLeftmostInputFragment(i, exec_request);
DCHECK_GE(input_fragment_idx, 0);
DCHECK_LT(input_fragment_idx, fragment_exec_params->size());
params.hosts = (*fragment_exec_params)[input_fragment_idx].hosts;
// TODO: switch to unpartitioned/coord execution if our input fragment
// is executed that way (could have been downgraded from distributed)
// This fragment is executed on those hosts that have scan ranges
// for the leftmost scan.
GetScanHosts(leftmost_scan_id, exec_request, params, &params.hosts);
unordered_set<TNetworkAddress> unique_hosts;
for (const FragmentExecParams& exec_params: *fragment_exec_params) {
unique_hosts.insert(exec_params.hosts.begin(), exec_params.hosts.end());
PlanNodeId SimpleScheduler::FindLeftmostNode(
const TPlan& plan, const vector<TPlanNodeType::type>& types) {
// the first node with num_children == 0 is the leftmost node
int node_idx = 0;
while (node_idx < plan.nodes.size() && plan.nodes[node_idx].num_children != 0) {
if (node_idx == plan.nodes.size()) {
return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
const TPlanNode& node = plan.nodes[node_idx];
for (int i = 0; i < types.size(); ++i) {
if (node.node_type == types[i]) return node.node_id;
return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
bool SimpleScheduler::ContainsNode(const TPlan& plan, TPlanNodeType::type type) {
for (int i = 0; i < plan.nodes.size(); ++i) {
if (plan.nodes[i].node_type == type) return true;
return false;
void SimpleScheduler::FindNodes(const TPlan& plan,
const vector<TPlanNodeType::type>& types, vector<TPlanNodeId>* results) {
for (int i = 0; i < plan.nodes.size(); ++i) {
for (int j = 0; j < types.size(); ++j) {
if (plan.nodes[i].node_type == types[j]) {
void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id,
const TQueryExecRequest& exec_request, const FragmentExecParams& params,
vector<TNetworkAddress>* scan_hosts) {
map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry =
if (entry == exec_request.per_node_scan_ranges.end() || entry->second.empty()) {
// this scan node doesn't have any scan ranges; run it on the coordinator
// TODO: we'll need to revisit this strategy once we can partition joins
// (in which case this fragment might be executing a right outer join
// with a large build table)
// Get the list of impalad host from scan_range_assignment_
for (const FragmentScanRangeAssignment::value_type& scan_range_assignment:
params.scan_range_assignment) {
int SimpleScheduler::FindLeftmostInputFragment(
int fragment_idx, const TQueryExecRequest& exec_request) {
// find the leftmost node, which we expect to be an exchage node
vector<TPlanNodeType::type> exch_node_type;
PlanNodeId exch_id =
FindLeftmostNode(exec_request.fragments[fragment_idx].plan, exch_node_type);
if (exch_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
// find the fragment that sends to this exchange node
return FindSenderFragment(exch_id, fragment_idx, exec_request);
int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
const TQueryExecRequest& exec_request) {
for (int i = 0; i < exec_request.dest_fragment_idx.size(); ++i) {
if (exec_request.dest_fragment_idx[i] != fragment_idx) continue;
const TPlanFragment& input_fragment = exec_request.fragments[i + 1];
if (input_fragment.output_sink.stream_sink.dest_node_id == exch_id) return i + 1;
// this shouldn't happen
DCHECK(false) << "no fragment sends to exch id " << exch_id;
return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
string resolved_pool;
schedule->request().query_ctx, &resolved_pool));
schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
if (ExecEnv::GetInstance()->impala_server()->IsOffline()) {
return Status("This Impala server is offline. Please retry your query later.");
RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
ComputeFragmentHosts(schedule->request(), schedule);
ComputeFragmentExecParams(schedule->request(), schedule);
if (!FLAGS_disable_admission_control) {
if (!FLAGS_enable_rm) return Status::OK();
string user = GetEffectiveUser(schedule->request().query_ctx.session);
if (user.empty()) user = "default";
schedule->PrepareReservationRequest(resolved_pool, user);
const TResourceBrokerReservationRequest& reservation_request =
if (!reservation_request.resources.empty()) {
Status status = resource_broker_->Reserve(
reservation_request, schedule->reservation());
if (!status.ok()) {
// Warn about missing table and/or column stats if necessary.
const TQueryCtx& query_ctx = schedule->request().query_ctx;
if (!query_ctx.__isset.parent_query_id &&
query_ctx.__isset.tables_missing_stats &&
!query_ctx.tables_missing_stats.empty()) {
return status;
AddToActiveResourceMaps(*schedule->reservation(), coord);
return Status::OK();
Status SimpleScheduler::Release(QuerySchedule* schedule) {
if (!FLAGS_disable_admission_control) {
if (FLAGS_enable_rm && schedule->NeedsRelease()) {
DCHECK(resource_broker_ != NULL);
Status status = resource_broker_->ReleaseReservation(
// Remove the reservation from the active-resource maps even if there was an error
// releasing the reservation because the query running in the reservation is done.
return Status::OK();
void SimpleScheduler::AddToActiveResourceMaps(
const TResourceBrokerReservationResponse& reservation, Coordinator* coord) {
lock_guard<mutex> l(active_resources_lock_);
active_reservations_[reservation.reservation_id] = coord;
map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
for (iter = reservation.allocated_resources.begin();
iter != reservation.allocated_resources.end();
++iter) {
TUniqueId client_resource_id;
client_resource_id << iter->second.client_resource_id;
active_client_resources_[client_resource_id] = coord;
void SimpleScheduler::RemoveFromActiveResourceMaps(
const TResourceBrokerReservationResponse& reservation) {
lock_guard<mutex> l(active_resources_lock_);
map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
for (iter = reservation.allocated_resources.begin();
iter != reservation.allocated_resources.end();
++iter) {
TUniqueId client_resource_id;
client_resource_id << iter->second.client_resource_id;
// TODO: Refactor the Handle*{Reservation,Resource} functions to avoid code duplication.
void SimpleScheduler::HandlePreemptedReservation(const TUniqueId& reservation_id) {
VLOG_QUERY << "HandlePreemptedReservation client_id=" << reservation_id;
Coordinator* coord = NULL;
lock_guard<mutex> l(active_resources_lock_);
ActiveReservationsMap::iterator it = active_reservations_.find(reservation_id);
if (it != active_reservations_.end()) coord = it->second;
if (coord == NULL) {
LOG(WARNING) << "Ignoring preempted reservation id " << reservation_id
<< " because no active query using it was found.";
} else {
stringstream err_msg;
err_msg << "Reservation " << reservation_id << " was preempted";
Status status(err_msg.str());
void SimpleScheduler::HandlePreemptedResource(const TUniqueId& client_resource_id) {
VLOG_QUERY << "HandlePreemptedResource client_id=" << client_resource_id;
Coordinator* coord = NULL;
lock_guard<mutex> l(active_resources_lock_);
ActiveClientResourcesMap::iterator it =
if (it != active_client_resources_.end()) coord = it->second;
if (coord == NULL) {
LOG(WARNING) << "Ignoring preempted client resource id " << client_resource_id
<< " because no active query using it was found.";
} else {
stringstream err_msg;
err_msg << "Resource " << client_resource_id << " was preempted";
Status status(err_msg.str());
void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) {
VLOG_QUERY << "HandleLostResource preempting client_id=" << client_resource_id;
Coordinator* coord = NULL;
lock_guard<mutex> l(active_resources_lock_);
ActiveClientResourcesMap::iterator it =
if (it != active_client_resources_.end()) coord = it->second;
if (coord == NULL) {
LOG(WARNING) << "Ignoring lost client resource id " << client_resource_id
<< " because no active query using it was found.";
} else {
stringstream err_msg;
err_msg << "Resource " << client_resource_id << " was lost";
Status status(err_msg.str());
Status SimpleScheduler::HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) {
// Try to resolve via the operating system.
vector<IpAddr> ipaddrs;
Status status = HostnameToIpAddrs(hostname, &ipaddrs);
if (!status.ok() || ipaddrs.empty()) {
stringstream ss;
ss << "Failed to resolve " << hostname << ": " << status.GetDetail();
return Status(ss.str());
// HostnameToIpAddrs() calls getaddrinfo() from glibc and will preserve the order of the
// result. RFC 3484 only specifies a partial order so we need to sort the addresses
// before picking the first non-localhost one.
sort(ipaddrs.begin(), ipaddrs.end());
// Try to find a non-localhost address, otherwise just use the first IP address
// returned.
*ip = ipaddrs[0];
if (!FindFirstNonLocalhost(ipaddrs, ip)) {
VLOG(3) << "Only localhost addresses found for " << hostname;
return Status::OK();
const std::vector<TNetworkAddress>& backends) {
// Construct backend_map and backend_ip_map.
for (int i = 0; i < backends.size(); ++i) {
IpAddr ip;
Status status = HostnameToIpAddr(backends[i].hostname, &ip);
if (!status.ok()) {
VLOG(1) << status.GetDetail();
BackendMap::iterator it = backend_map_.find(ip);
if (it == backend_map_.end()) {
it = backend_map_.insert(
make_pair(ip, BackendList())).first;
backend_ip_map_[backends[i].hostname] = ip;
TBackendDescriptor descriptor;
descriptor.address = MakeNetworkAddress(ip, backends[i].port);
descriptor.ip_address = ip;
void SimpleScheduler::BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
BackendList* be_descs = &backend_map_[be_desc.ip_address];
if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
void SimpleScheduler::BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) {
auto be_descs_it = backend_map_.find(be_desc.ip_address);
if (be_descs_it != backend_map_.end()) {
BackendList* be_descs = &be_descs_it->second;
be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
if (be_descs->empty()) backend_map_.erase(be_descs_it);
bool SimpleScheduler::BackendConfig::LookUpBackendIp(const Hostname& hostname,
IpAddr* ip) const {
// Check if hostname is already a valid IP address.
if (backend_map_.find(hostname) != backend_map_.end()) {
if (ip) *ip = hostname;
return true;
auto it = backend_ip_map_.find(hostname);
if (it != backend_ip_map_.end()) {
if (ip) *ip = it->second;
return true;
return false;
const BackendConfig& backend_config,
IntCounter* total_assignments, IntCounter* total_local_assignments)
: backend_config_(backend_config), first_unused_backend_idx_(0),
total_local_assignments_(total_local_assignments) {
for (auto& v: backend_map()) random_backend_order_.push_back(&v);
std::mt19937 g(rand());
std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g);
// Initialize inverted map for backend rank lookups
int i = 0;
for (const BackendMap::value_type* v: random_backend_order_) {
random_backend_rank_[v->first] = i++;
const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost(
const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
// List of candidate indexes into 'data_locations'.
vector<int> candidates_idxs;
// Find locations with minimum number of assigned bytes.
int64_t min_assigned_bytes = numeric_limits<int64_t>::max();
for (int i = 0; i < data_locations.size(); ++i) {
const IpAddr& backend_ip = data_locations[i];
int64_t assigned_bytes = 0;
auto handle_it = assignment_heap_.find(backend_ip);
if (handle_it != assignment_heap_.end()) {
assigned_bytes = (*handle_it->second).assigned_bytes;
if (assigned_bytes < min_assigned_bytes) {
min_assigned_bytes = assigned_bytes;
if (assigned_bytes == min_assigned_bytes) candidates_idxs.push_back(i);
auto min_rank_idx = candidates_idxs.begin();
if (break_ties_by_rank) {
min_rank_idx = min_element(candidates_idxs.begin(), candidates_idxs.end(),
[&data_locations, this](const int& a, const int& b) {
return GetBackendRank(data_locations[a]) < GetBackendRank(data_locations[b]);
return &data_locations[*min_rank_idx];
const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost() {
const IpAddr* candidate_ip;
if (HasUnusedBackends()) {
// Pick next unused backend.
candidate_ip = GetNextUnusedBackendAndIncrement();
} else {
// Pick next backend from assignment_heap. All backends must have been inserted into
// the heap at this point.
DCHECK(backend_config_.NumBackends() == assignment_heap_.size());
candidate_ip = &(;
DCHECK(candidate_ip != NULL);
return candidate_ip;
bool SimpleScheduler::AssignmentCtx::HasUnusedBackends() const {
return first_unused_backend_idx_ < random_backend_order_.size();
const SimpleScheduler::IpAddr*
SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
const IpAddr* ip = &(random_backend_order_[first_unused_backend_idx_++])->first;
DCHECK(backend_map().find(*ip) != backend_map().end());
return ip;
int SimpleScheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const {
auto it = random_backend_rank_.find(ip);
DCHECK(it != random_backend_rank_.end());
return it->second;
void SimpleScheduler::AssignmentCtx::SelectBackendOnHost(const IpAddr& backend_ip,
TBackendDescriptor* backend) {
BackendMap::const_iterator backend_it = backend_map().find(backend_ip);
DCHECK(backend_it != backend_map().end());
const BackendList& backends_on_host = backend_it->second;
DCHECK(backends_on_host.size() > 0);
if (backends_on_host.size() == 1) {
*backend = *backends_on_host.begin();
} else {
BackendList::const_iterator* next_backend_on_host;
next_backend_on_host = FindOrInsert(&next_backend_per_host_, backend_ip,
DCHECK(find(backends_on_host.begin(), backends_on_host.end(), **next_backend_on_host)
!= backends_on_host.end());
*backend = **next_backend_on_host;
// Rotate
if (*next_backend_on_host == backends_on_host.end()) {
*next_backend_on_host = backends_on_host.begin();
void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
const TBackendDescriptor& backend, PlanNodeId node_id,
const vector<TNetworkAddress>& host_list,
const TScanRangeLocations& scan_range_locations,
FragmentScanRangeAssignment* assignment) {
int64_t scan_range_length = 0;
if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
} else if (scan_range_locations.scan_range.__isset.kudu_key_range) {
// Hack so that kudu ranges are well distributed.
// TODO: KUDU-1133 Use the tablet size instead.
scan_range_length = 1000;
IpAddr backend_ip;
backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip);
DCHECK(backend_map().find(backend_ip) != backend_map().end());
assignment_heap_.InsertOrUpdate(backend_ip, scan_range_length,
// See if the read will be remote. This is not the case if the impalad runs on one of
// the replica's datanodes.
bool remote_read = true;
// For local reads we can set volume_id and is_cached. For remote reads HDFS will
// decide which replica to use so we keep those at default values.
int volume_id = -1;
bool is_cached = false;
for (const TScanRangeLocation& location: scan_range_locations.locations) {
const TNetworkAddress& replica_host = host_list[location.host_idx];
IpAddr replica_ip;
if (backend_config_.LookUpBackendIp(replica_host.hostname, &replica_ip)
&& backend_ip == replica_ip) {
remote_read = false;
volume_id = location.volume_id;
is_cached = location.is_cached;
if (remote_read) {
assignment_byte_counters_.remote_bytes += scan_range_length;
} else {
assignment_byte_counters_.local_bytes += scan_range_length;
if (is_cached) assignment_byte_counters_.cached_bytes += scan_range_length;
if (total_assignments_ != NULL) {
DCHECK(total_local_assignments_ != NULL);
if (!remote_read) total_local_assignments_->Increment(1);
PerNodeScanRanges* scan_ranges = FindOrInsert(assignment, backend.address,
vector<TScanRangeParams>* scan_range_params_list = FindOrInsert(scan_ranges, node_id,
// Add scan range.
TScanRangeParams scan_range_params;
scan_range_params.scan_range = scan_range_locations.scan_range;
VLOG_FILE << "SimpleScheduler assignment to backend: " << backend.address
<< "(" << (remote_read ? "remote" : "local") << " selection)";
void SimpleScheduler::AssignmentCtx::PrintAssignment(
const FragmentScanRangeAssignment& assignment) {
VLOG_FILE << "Total remote scan volume = " <<
PrettyPrinter::Print(assignment_byte_counters_.remote_bytes, TUnit::BYTES);
VLOG_FILE << "Total local scan volume = " <<
PrettyPrinter::Print(assignment_byte_counters_.local_bytes, TUnit::BYTES);
VLOG_FILE << "Total cached scan volume = " <<
PrettyPrinter::Print(assignment_byte_counters_.cached_bytes, TUnit::BYTES);
for (const FragmentScanRangeAssignment::value_type& entry: assignment) {
VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
for (const PerNodeScanRanges::value_type& per_node_scan_ranges: entry.second) {
stringstream str;
for (const TScanRangeParams& params: per_node_scan_ranges.second) {
str << ThriftDebugString(params) << " ";
VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
void SimpleScheduler::AddressableAssignmentHeap::InsertOrUpdate(const IpAddr& ip,
int64_t assigned_bytes, int rank) {
auto handle_it = backend_handles_.find(ip);
if (handle_it == backend_handles_.end()) {
AssignmentHeap::handle_type handle = backend_heap_.push({assigned_bytes, rank, ip});
backend_handles_.emplace(ip, handle);
} else {
// We need to rebuild the heap after every update operation. Calling decrease once is
// sufficient as both assignments decrease the key.
AssignmentHeap::handle_type handle = handle_it->second;
(*handle).assigned_bytes += assigned_bytes;