blob: 4fb81e9f31fd04f9fa02f8f1040d1b7db7ae50cf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <set>
#include <vector>
#include <boost/scoped_ptr.hpp>
#include "common/logging.h"
#include "simple-scheduler.h"
#include "util/runtime-profile.h"
#include "common/names.h"
using namespace impala;
namespace impala {
// Typedefs to make the rest of the code more readable.
typedef string Hostname;
typedef string IpAddr;
typedef string TableName;
/// Sample 'n' elements without replacement from the set [0..N-1].
/// This is an implementation of "Algorithm R" by J. Vitter.
void SampleN(int n, int N, vector<int>* out) {
if (n == 0) return;
DCHECK(n <= N);
for (int i = 0; i < n; ++i) out->push_back(i);
for (int i = n; i < N; ++i) {
// Accept element with probability n/i. Place at random position.
int r = rand() % i;
if (r < n) (*out)[r] = i;
/// Sample a set of 'n' elements from 'in' without replacement and copy them to
/// 'out'.
template <typename T>
void SampleNElements(int n, const vector<T>& in, vector<T>* out) {
vector<int> idxs;
SampleN(n, in.size(), &idxs);
DCHECK_EQ(n, idxs.size());
for (int idx: idxs) out->push_back(in[idx]);
/// Helper classes to be used by the scheduler tests.
/// Overall testing approach: Each test builds a list of hosts and a plan, both to which
/// elements can be added using various helper methods. Then scheduling can be tested
/// by instantiating SchedulerWrapper and calling Compute(...). The result can be verified
/// using a set of helper methods. There are also helper methods to modify the internal
/// state of the scheduler between subsequent calls to SchedulerWrapper::Compute().
/// The model currently comes with some known limitations:
/// - Files map 1:1 to blocks and to scan ranges.
/// - All files have the same size (1 block of 1M). Tables that differ in size can be
/// expressed as having a different number of blocks.
/// - We don't support multiple backends on a single host.
/// - Ports are assigned to hosts automatically and are not configurable by the test.
// TODO: Extend the model to support files with multiple blocks.
// TODO: Test more methods of the scheduler.
// TODO: Add support to add skewed table scans with multiple scan ranges: often there are
// 3 replicas where there may be skew for 1 of the replicas (e.g. after a single
// node insert) but the other 2 are random.
// TODO: Make use of the metrics provided by the scheduler.
// TODO: Add checks for MinNumAssignmentsPerHost() to all tests where applicable.
// TODO: Add post-condition checks that have to hold for all successful scheduler runs.
// TODO: Add possibility to explicitly specify the replica location per file.
// TODO: Add methods to retrieve and verify generated file placements from plan.
// TODO: Extend the model to specify a physical schema independently of a plan (ie,
// tables/files, blocks, replicas and cached replicas exist independently of the
// queries that run against them).
/// File blocks store a list of all datanodes that have a replica of the block. When
/// defining tables you can specify the desired replica placement among all available
/// datanodes in the cluster.
/// - RANDOM means that any datanode can be picked.
/// - LOCAL_ONLY means that only datanodes with a backend will be picked.
/// - REMOTE_ONLY means that only datanodes without a backend will be picked.
/// Whether replicas will be cached or not is not determined by this value, but by
/// additional function arguments when adding tables to the schema.
enum class ReplicaPlacement {
/// Host model. Each host can have either a backend, a datanode, or both. To specify that
/// a host should not act as a backend or datanode specify '-1' as the respective port.
struct Host {
Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port)
: name(name), ip(ip), be_port(be_port), dn_port(dn_port) {}
Hostname name;
IpAddr ip;
int be_port; // Backend port
int dn_port; // Datanode port
/// A cluster stores a list of hosts and provides various methods to add hosts to the
/// cluster. All hosts are guaranteed to have unique IP addresses and hostnames.
class Cluster {
/// Add a host and return the host's index. 'hostname' and 'ip' of the new host will be
/// generated and are guaranteed to be unique.
int AddHost(bool has_backend, bool has_datanode) {
int host_idx = hosts_.size();
int be_port = has_backend ? BACKEND_PORT : -1;
int dn_port = has_datanode ? DATANODE_PORT : -1;
IpAddr ip = HostIdxToIpAddr(host_idx);
DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end());
ip_to_idx_[ip] = host_idx;
hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port));
// Add host to lists of backend indexes per type.
if (has_backend) backend_host_idxs_.push_back(host_idx);
if (has_datanode) {
if (has_backend) {
} else {
return host_idx;
/// Add a number of hosts with the same properties by repeatedly calling AddHost(..).
void AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
/// Convert a host index to a hostname.
static Hostname HostIdxToHostname(int host_idx) {
return HOSTNAME_PREFIX + std::to_string(host_idx);
/// Return the backend address (ip, port) for the host with index 'host_idx'.
void GetBackendAddress(int host_idx, TNetworkAddress* addr) const {
DCHECK_LT(host_idx, hosts_.size());
addr->hostname = hosts_[host_idx].ip;
addr->port = hosts_[host_idx].be_port;
const vector<Host>& hosts() const { return hosts_; }
int NumHosts() const { return hosts_.size(); }
/// These methods return lists of host indexes, grouped by their type, which can be used
/// to draw samples of random sets of hosts.
/// TODO: Think of a nicer abstraction to expose this information.
const vector<int>& backend_host_idxs() const { return backend_host_idxs_; }
const vector<int>& datanode_host_idxs() const { return datanode_host_idxs_; }
const vector<int>& datanode_with_backend_host_idxs() const {
return datanode_with_backend_host_idxs_;
const vector<int>& datanode_only_host_idxs() const { return datanode_only_host_idxs_; }
/// Port for all backends.
static const int BACKEND_PORT;
/// Port for all datanodes.
static const int DATANODE_PORT;
/// Prefix for all generated hostnames.
static const string HOSTNAME_PREFIX;
/// First octet for all generated IP addresses.
static const string IP_PREFIX;
/// List of hosts in this cluster.
vector<Host> hosts_;
/// Lists of indexes of hosts, grouped by their type. The lists reference hosts in
/// 'hosts_' by index and are used for random sampling.
/// All hosts with a backend.
vector<int> backend_host_idxs_;
/// All hosts with a datanode.
vector<int> datanode_host_idxs_;
/// All hosts with a datanode and a backend.
vector<int> datanode_with_backend_host_idxs_;
/// All hosts with a datanode but no backend.
vector<int> datanode_only_host_idxs_;
/// Map from IP addresses to host indexes.
unordered_map<IpAddr, int> ip_to_idx_;
/// Convert a host index to an IP address. The host index must be smaller than 2^24 and
/// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
static IpAddr HostIdxToIpAddr(int host_idx) {
DCHECK_LT(host_idx, (1 << 24));
string suffix;
for (int i = 0; i < 3; ++i) {
suffix = "." + std::to_string(host_idx % 256) + suffix; // prepend
host_idx /= 256;
DCHECK_EQ(0, host_idx);
return IP_PREFIX + suffix;
const int Cluster::BACKEND_PORT = 1000;
const int Cluster::DATANODE_PORT = 2000;
const string Cluster::HOSTNAME_PREFIX = "host_";
const string Cluster::IP_PREFIX = "10";
struct Block {
/// By default all blocks are of the same size.
int64_t length = DEFAULT_BLOCK_SIZE;
/// Index into the cluster that owns the table that owns this block.
vector<int> replica_host_idxs;
/// Flag for each entry in replica_host_idxs whether it is a cached replica or not.
vector<bool> replica_host_idx_is_cached;
/// Default size for new blocks.
static const int64_t DEFAULT_BLOCK_SIZE;
/// Default size for new blocks is 1MB.
const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20;
struct Table {
vector<Block> blocks;
class Schema {
Schema(const Cluster& cluster) : cluster_(cluster) {}
/// Add a table consisting of a single block to the schema with explicitly specified
/// replica indexes for non-cached replicas and without any cached replicas. Replica
/// indexes must refer to hosts in cluster_.hosts() by index.
void AddSingleBlockTable(const TableName& table_name,
const vector<int>& non_cached_replica_host_idxs) {
AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {});
/// Add a table consisting of a single block to the schema with explicitly specified
/// replica indexes for both non-cached and cached replicas. Values in both lists must
/// refer to hosts in cluster_.hosts() by index. Both lists must be disjoint, i.e., a
/// replica can either be cached or not.
void AddSingleBlockTable(const TableName& table_name,
const vector<int>& non_cached_replica_host_idxs,
const vector<int>& cached_replica_host_idxs) {
DCHECK(tables_.find(table_name) == tables_.end());
Block block;
int num_replicas = non_cached_replica_host_idxs.size() +
block.replica_host_idxs = non_cached_replica_host_idxs;
cached_replica_host_idxs.begin(), cached_replica_host_idxs.end());
// Initialize for non-cached replicas first.
block.replica_host_idx_is_cached.resize(non_cached_replica_host_idxs.size(), false);
// Fill up to final size for cached replicas.
cached_replica_host_idxs.size(), true);
DCHECK_EQ(block.replica_host_idxs.size(), block.replica_host_idx_is_cached.size());
DCHECK_EQ(block.replica_host_idxs.size(), num_replicas);
// Create table
Table table;
// Insert table
tables_.emplace(table_name, table);
/// Add a table to the schema, selecting replica hosts according to the given replica
/// placement preference. All replicas will be non-cached.
void AddMultiBlockTable(const TableName& table_name, int num_blocks,
ReplicaPlacement replica_placement, int num_replicas) {
AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 0);
/// Add a table to the schema, selecting replica hosts according to the given replica
/// placement preference. After replica selection has been done, 'num_cached_replicas'
/// of them are marked as cached.
void AddMultiBlockTable(const TableName& table_name, int num_blocks,
ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas) {
DCHECK_GT(num_replicas, 0);
DCHECK(num_cached_replicas <= num_replicas);
Table table;
for (int i = 0; i < num_blocks; ++i) {
Block block;
vector<int>& replica_idxs = block.replica_host_idxs;
// Determine replica host indexes.
switch (replica_placement) {
case ReplicaPlacement::RANDOM:
SampleNElements(num_replicas, cluster_.datanode_host_idxs(), &replica_idxs);
case ReplicaPlacement::LOCAL_ONLY:
DCHECK(num_replicas <= cluster_.datanode_with_backend_host_idxs().size());
SampleNElements(num_replicas, cluster_.datanode_with_backend_host_idxs(),
case ReplicaPlacement::REMOTE_ONLY:
DCHECK(num_replicas <= cluster_.datanode_only_host_idxs().size());
SampleNElements(num_replicas, cluster_.datanode_only_host_idxs(),
DCHECK(false) << "Unsupported replica placement: "
<< (int)replica_placement;
// Determine cached replicas.
vector<int> cached_replicas;
vector<bool>& is_cached = block.replica_host_idx_is_cached;
is_cached.resize(num_replicas, false);
SampleN(num_cached_replicas, num_replicas, &cached_replicas);
// Flag cached entries.
for (const int idx: cached_replicas) is_cached[idx] = true;
DCHECK_EQ(replica_idxs.size(), is_cached.size());
// Insert table
tables_[table_name] = table;
const Table& GetTable(const TableName& table_name) const {
auto it = tables_.find(table_name);
DCHECK(it != tables_.end());
return it->second;
const Cluster& cluster() const { return cluster_; }
/// Store a reference to the cluster, from which hosts are sampled. Test results will
/// use the cluster to resolve host indexes to hostnames and IP addresses.
const Cluster& cluster_;
unordered_map<TableName, Table> tables_;
/// Plan model. A plan contains a list of tables to scan and the query options to be used
/// during scheduling.
class Plan {
Plan(const Schema& schema) : schema_(schema) {}
const TQueryOptions& query_options() const { return query_options_; }
void SetReplicaPreference(TReplicaPreference::type p) {
query_options_.replica_preference = p;
void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; }
const Cluster& cluster() const { return schema_.cluster(); }
const vector<TNetworkAddress>& referenced_datanodes() const {
return referenced_datanodes_;
const vector<TScanRangeLocations>& scan_range_locations() const {
return scan_range_locations_;
/// Add a scan of table 'table_name' to the plan. This method will populate the internal
/// list of TScanRangeLocations and can be called multiple times for the same table to
/// schedule additional scans.
void AddTableScan(const TableName& table_name) {
const Table& table = schema_.GetTable(table_name);
const vector<Block>& blocks = table.blocks;
for (int i = 0; i < blocks.size(); ++i) {
const Block& block = blocks[i];
TScanRangeLocations scan_range_locations;
BuildTScanRangeLocations(table_name, block, i, &scan_range_locations);
/// Store a reference to the schema, from which scanned tables will be read.
const Schema& schema_;
TQueryOptions query_options_;
/// List of all datanodes that are referenced by this plan. Only hosts that have an
/// assigned scan range are added here.
vector<TNetworkAddress> referenced_datanodes_;
/// Map from plan host index to an index in 'referenced_datanodes_'.
boost::unordered_map<int, int> host_idx_to_datanode_idx_;
/// List of all scan range locations, which can be passed to the SimpleScheduler.
vector<TScanRangeLocations> scan_range_locations_;
/// Initialize a TScanRangeLocations object in place.
void BuildTScanRangeLocations(const TableName& table_name, const Block& block,
int block_idx, TScanRangeLocations* scan_range_locations) {
const vector<int>& replica_idxs = block.replica_host_idxs;
const vector<bool>& is_cached = block.replica_host_idx_is_cached;
DCHECK_EQ(replica_idxs.size(), is_cached.size());
int num_replicas = replica_idxs.size();
BuildScanRange(table_name, block, block_idx, &scan_range_locations->scan_range);
for (int i = 0; i < num_replicas; ++i) {
TScanRangeLocation& location = scan_range_locations->locations[i];
location.host_idx = FindOrInsertDatanodeIndex(replica_idxs[i]);
void BuildScanRange(const TableName& table_name, const Block& block, int block_idx,
TScanRange* scan_range) {
// Initialize locations.scan_range correctly.
THdfsFileSplit file_split;
// 'length' is the only member considered by the scheduler.
file_split.length = block.length;
// Encoding the table name and block index in the file helps debugging.
file_split.file_name = table_name + "_block_" + std::to_string(block_idx);
file_split.offset = 0;
file_split.partition_id = 0;
// For now, we model each file by a single block.
file_split.file_length = block.length;
file_split.file_compression = THdfsCompression::NONE;
file_split.mtime = 1;
/// Look up the plan-local host index of 'cluster_datanode_idx'. If the host has not
/// been added to the plan before, it will add it to 'referenced_datanodes_' and return
/// the new index.
int FindOrInsertDatanodeIndex(int cluster_datanode_idx) {
const Host& host = schema_.cluster().hosts()[cluster_datanode_idx];
auto ret = host_idx_to_datanode_idx_.emplace(
cluster_datanode_idx, referenced_datanodes_.size());
bool inserted_new_element = ret.second;
if (inserted_new_element) {
TNetworkAddress datanode;
datanode.hostname = host.ip;
datanode.port = host.dn_port;
return ret.first->second;
class Result {
/// Map to count the number of assignments per backend.
typedef unordered_map<IpAddr, int> NumAssignmentsPerBackend;
/// Map to count the number of assigned bytes per backend.
typedef unordered_map<IpAddr, int64_t> NumAssignedBytesPerBackend;
/// Parameter type for callbacks, which are used to filter scheduling results.
struct AssignmentInfo {
const TNetworkAddress& addr;
const THdfsFileSplit& hdfs_file_split;
bool is_cached;
bool is_remote;
/// These functions are used as callbacks when processing the scheduling result. They
/// will be called once per assigned scan range.
typedef std::function<bool(const AssignmentInfo& assignment)> AssignmentFilter;
typedef std::function<void(const AssignmentInfo& assignment)> AssignmentCallback;
Result(const Plan& plan) : plan_(plan) {}
/// Return the total number of scheduled assignments.
int NumTotalAssignments() const { return CountAssignmentsIf(Any()); }
/// Return the total number of assigned bytes.
int NumTotalAssignedBytes() const { return CountAssignedBytesIf(Any()); }
/// Return the number of scheduled assignments for a single host.
int NumTotalAssignments(int host_idx) const {
return CountAssignmentsIf(IsHost(host_idx));
/// Return the number of assigned bytes for a single host.
int NumTotalAssignedBytes(int host_idx) const {
return CountAssignedBytesIf(IsHost(host_idx));
/// Return the total number of assigned cached reads.
int NumCachedAssignments() const { return CountAssignmentsIf(IsCached(Any())); }
/// Return the total number of assigned bytes for cached reads.
int NumCachedAssignedBytes() const { return CountAssignedBytesIf(IsCached(Any())); }
/// Return the total number of assigned cached reads for a single host.
int NumCachedAssignments(int host_idx) const {
return CountAssignmentsIf(IsCached(IsHost(host_idx)));
/// Return the total number of assigned bytes for cached reads for a single host.
int NumCachedAssignedBytes(int host_idx) const {
return CountAssignedBytesIf(IsCached(IsHost(host_idx)));
/// Return the total number of assigned non-cached reads.
int NumDiskAssignments() const { return CountAssignmentsIf(IsDisk(Any())); }
/// Return the total number of assigned bytes for non-cached reads.
int NumDiskAssignedBytes() const { return CountAssignedBytesIf(IsDisk(Any())); }
/// Return the total number of assigned non-cached reads for a single host.
int NumDiskAssignments(int host_idx) const {
return CountAssignmentsIf(IsDisk(IsHost(host_idx)));
/// Return the total number of assigned bytes for non-cached reads for a single host.
int NumDiskAssignedBytes(int host_idx) const {
return CountAssignedBytesIf(IsDisk(IsHost(host_idx)));
/// Return the total number of assigned remote reads.
int NumRemoteAssignments() const { return CountAssignmentsIf(IsRemote(Any())); }
/// Return the total number of assigned bytes for remote reads.
int NumRemoteAssignedBytes() const { return CountAssignedBytesIf(IsRemote(Any())); }
/// Return the total number of assigned remote reads for a single host.
int NumRemoteAssignments(int host_idx) const {
return CountAssignmentsIf(IsRemote(IsHost(host_idx)));
/// Return the total number of assigned bytes for remote reads for a single host.
int NumRemoteAssignedBytes(int host_idx) const {
return CountAssignedBytesIf(IsRemote(IsHost(host_idx)));
/// Return the maximum number of assigned reads over all hosts.
int MaxNumAssignmentsPerHost() const {
NumAssignmentsPerBackend num_assignments_per_backend;
int max_count = 0;
for (const auto& elem: num_assignments_per_backend) {
max_count = max(max_count, elem.second);
return max_count;
/// Return the maximum number of assigned reads over all hosts.
int64_t MaxNumAssignedBytesPerHost() const {
NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
int64_t max_assigned_bytes = 0;
for (const auto& elem: num_assigned_bytes_per_backend) {
max_assigned_bytes = max(max_assigned_bytes, elem.second);
return max_assigned_bytes;
/// Return the minimum number of assigned reads over all hosts.
/// NOTE: This is computed by traversing all recorded assignments and thus will not
/// consider hosts without any assignments. Hence the minimum value to expect is 1 (not
/// 0).
int MinNumAssignmentsPerHost() const {
NumAssignmentsPerBackend num_assignments_per_backend;
int min_count = numeric_limits<int>::max();
for (const auto& elem: num_assignments_per_backend) {
min_count = min(min_count, elem.second);
DCHECK_GT(min_count, 0);
return min_count;
/// Return the minimum number of assigned bytes over all hosts.
/// NOTE: This is computed by traversing all recorded assignments and thus will not
/// consider hosts without any assignments. Hence the minimum value to expect is 1 (not
/// 0).
int64_t MinNumAssignedBytesPerHost() const {
NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
int64_t min_assigned_bytes = 0;
for (const auto& elem: num_assigned_bytes_per_backend) {
min_assigned_bytes = max(min_assigned_bytes, elem.second);
DCHECK_GT(min_assigned_bytes, 0);
return min_assigned_bytes;
/// Return the number of scan range assignments stored in this result.
int NumAssignments() const { return assignments_.size(); }
/// Return the number of distinct backends that have been picked by the scheduler so
/// far.
int NumDistinctBackends() const {
unordered_set<IpAddr> backends;
AssignmentCallback cb = [&backends](const AssignmentInfo& assignment) {
return backends.size();
/// Return the full assignment for manual matching.
const FragmentScanRangeAssignment& GetAssignment(int index = 0) const {
DCHECK_GT(assignments_.size(), index);
return assignments_[index];
/// Add an assignment to the result and return a reference, which can then be passed on
/// to the scheduler.
FragmentScanRangeAssignment* AddAssignment() {
return &assignments_.back();
/// Reset the result to an empty state.
void Reset() { assignments_.clear(); }
/// Vector to store results of consecutive scheduler runs.
vector<FragmentScanRangeAssignment> assignments_;
/// Reference to the plan, needed to look up hosts.
const Plan& plan_;
/// Dummy filter matching any assignment.
AssignmentFilter Any() const {
return [](const AssignmentInfo& assignment) { return true; };
/// Filter to only match assignments of a particular host.
AssignmentFilter IsHost(int host_idx) const {
TNetworkAddress expected_addr;
plan_.cluster().GetBackendAddress(host_idx, &expected_addr);
return [expected_addr](const AssignmentInfo& assignment) {
return assignment.addr == expected_addr;
/// Filter to only match assignments of cached reads.
AssignmentFilter IsCached(AssignmentFilter filter) const {
return [filter](const AssignmentInfo& assignment) {
return filter(assignment) && assignment.is_cached;
/// Filter to only match assignments of non-cached, local disk reads.
AssignmentFilter IsDisk(AssignmentFilter filter) const {
return [filter](const AssignmentInfo& assignment) {
return filter(assignment) && !assignment.is_cached && !assignment.is_remote;
/// Filter to only match assignments of remote reads.
AssignmentFilter IsRemote(AssignmentFilter filter) const {
return [filter](const AssignmentInfo& assignment) {
return filter(assignment) && assignment.is_remote;
/// Process all recorded assignments and call the supplied callback on each tuple of IP
/// address and scan_range it iterates over.
void ProcessAssignments(const AssignmentCallback& cb) const {
for (const FragmentScanRangeAssignment& assignment: assignments_) {
for (const auto& assignment_elem: assignment) {
const TNetworkAddress& addr = assignment_elem.first;
const PerNodeScanRanges& per_node_ranges = assignment_elem.second;
for (const auto& per_node_ranges_elem: per_node_ranges) {
const vector<TScanRangeParams> scan_range_params_vector =
for (const TScanRangeParams& scan_range_params: scan_range_params_vector) {
const TScanRange& scan_range = scan_range_params.scan_range;
const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split;
bool is_cached = scan_range_params.__isset.is_cached
? scan_range_params.is_cached : false;
bool is_remote = scan_range_params.__isset.is_remote
? scan_range_params.is_remote : false;
cb({addr, hdfs_file_split, is_cached, is_remote});
/// Count all assignments matching the supplied filter callback.
int CountAssignmentsIf(const AssignmentFilter& filter) const {
int count = 0;
AssignmentCallback cb = [&count, filter](const AssignmentInfo& assignment) {
if (filter(assignment)) ++count;
return count;
/// Count all assignments matching the supplied filter callback.
int64_t CountAssignedBytesIf(const AssignmentFilter& filter) const {
int64_t assigned_bytes = 0;
AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& assignment) {
if (filter(assignment)) assigned_bytes += assignment.hdfs_file_split.length;
return assigned_bytes;
/// Create a map containing the number of assigned scan ranges per node.
void CountAssignmentsPerBackend(
NumAssignmentsPerBackend* num_assignments_per_backend) const {
AssignmentCallback cb = [&num_assignments_per_backend](
const AssignmentInfo& assignment) {
/// Create a map containing the number of assigned bytes per node.
void CountAssignedBytesPerBackend(
NumAssignedBytesPerBackend* num_assignments_per_backend) const {
AssignmentCallback cb = [&num_assignments_per_backend](
const AssignmentInfo& assignment) {
(*num_assignments_per_backend)[assignment.addr.hostname] +=
/// This class wraps the SimpleScheduler and provides helper for easier instrumentation
/// during tests.
class SchedulerWrapper {
SchedulerWrapper(const Plan& plan) : plan_(plan), metrics_("TestMetrics") {
/// Call ComputeScanRangeAssignment() with exec_at_coord set to false.
void Compute(Result* result) {
Compute(false, result);
/// Call ComputeScanRangeAssignment().
void Compute(bool exec_at_coord, Result* result) {
DCHECK(scheduler_ != NULL);
// Compute Assignment.
FragmentScanRangeAssignment* assignment = result->AddAssignment();
scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL,
false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord,
plan_.query_options(), NULL, assignment);
/// Reset the state of the scheduler by re-creating and initializing it.
void Reset() { InitializeScheduler(); }
/// Methods to modify the internal lists of backends maintained by the scheduler.
/// Add a backend to the scheduler.
void AddBackend(const Host& host) {
// Add to topic delta
TTopicDelta delta;
delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
delta.is_delta = true;
AddHostToTopicDelta(host, &delta);
/// Remove a backend from the scheduler.
void RemoveBackend(const Host& host) {
// Add deletion to topic delta
TTopicDelta delta;
delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
delta.is_delta = true;
/// Send a full map of the backends to the scheduler instead of deltas.
void SendFullMembershipMap() {
TTopicDelta delta;
delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
delta.is_delta = false;
for (const Host& host: plan_.cluster().hosts()) {
if (host.be_port >= 0) AddHostToTopicDelta(host, &delta);
/// Send an empty update message to the scheduler.
void SendEmptyUpdate() {
TTopicDelta delta;
delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
delta.is_delta = true;
const Plan& plan_;
boost::scoped_ptr<SimpleScheduler> scheduler_;
MetricGroup metrics_;
/// Initialize the internal scheduler object. The method uses the 'real' constructor
/// used in the rest of the codebase, in contrast to the one that takes a list of
/// backends, which is only used for testing purposes. This allows us to properly
/// initialize the scheduler and exercise the UpdateMembership() method in tests.
void InitializeScheduler() {
DCHECK(scheduler_ == NULL);
DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler with 0 "
<< "hosts.";
const Host& scheduler_host = plan_.cluster().hosts()[0];
string scheduler_backend_id = scheduler_host.ip;
TNetworkAddress scheduler_backend_address;
scheduler_backend_address.hostname = scheduler_host.ip;
scheduler_backend_address.port = scheduler_host.be_port;
scheduler_.reset(new SimpleScheduler(NULL, scheduler_backend_id,
scheduler_backend_address, &metrics_, NULL, NULL, NULL));
// Initialize the scheduler backend maps.
/// Add a single host to the given TTopicDelta.
void AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const {
DCHECK_GT(host.be_port, 0) << "Host cannot be added to scheduler without a running "
<< "backend";
// Build backend descriptor.
TBackendDescriptor be_desc;
be_desc.address.hostname = host.ip;
be_desc.address.port = host.be_port;
be_desc.ip_address = host.ip;
// Build topic item.
TTopicItem item;
item.key = host.ip;
ThriftSerializer serializer(false);
Status status = serializer.Serialize(&be_desc, &item.value);
// Add to topic delta.
/// Send the given topic delta to the scheduler.
void SendTopicDelta(const TTopicDelta& delta) {
DCHECK(scheduler_ != NULL);
// Wrap in topic delta map.
StatestoreSubscriber::TopicDeltaMap delta_map;
delta_map.emplace(SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
// Send to the scheduler.
vector<TTopicDelta> dummy_result;
scheduler_->UpdateMembership(delta_map, &dummy_result);
class SchedulerTest : public testing::Test {
SchedulerTest() { srand(0); };
/// Smoke test to schedule a single table with a single scan range over a single host.
TEST_F(SchedulerTest, SingleHostSingleFile) {
Cluster cluster;
cluster.AddHost(true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 1, ReplicaPlacement::LOCAL_ONLY, 1);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(1, result.NumTotalAssignments());
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
EXPECT_EQ(1, result.NumTotalAssignments(0));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
EXPECT_EQ(0, result.NumCachedAssignments());
/// Test assigning all scan ranges to the coordinator.
TEST_F(SchedulerTest, ExecAtCoord) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 3, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
bool exec_at_coord = true;
scheduler.Compute(exec_at_coord, &result);
EXPECT_EQ(3 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
/// Test scanning a simple table twice.
TEST_F(SchedulerTest, ScanTableTwice) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 2, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(4 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
EXPECT_EQ(4 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumCachedAssignedBytes());
// TODO: This test can be removed once we have the non-random backend round-robin by rank.
/// Schedule randomly over 3 backends and ensure that each backend is at least used once.
TEST_F(SchedulerTest, RandomReads) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddSingleBlockTable("T1", {0, 1, 2});
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
for (int i = 0; i < 100; ++i) scheduler.Compute(&result);
ASSERT_EQ(100, result.NumAssignments());
EXPECT_EQ(100, result.NumTotalAssignments());
EXPECT_EQ(100 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
EXPECT_EQ(3, result.NumDistinctBackends());
EXPECT_GE(result.MinNumAssignedBytesPerHost(), Block::DEFAULT_BLOCK_SIZE);
/// Distribute a table over the first 3 nodes in the cluster and verify that repeated
/// schedules always pick the first replica (random_replica = false).
TEST_F(SchedulerTest, LocalReadsPickFirstReplica) {
Cluster cluster;
for (int i = 0; i < 10; ++i) cluster.AddHost(i < 5, true);
Schema schema(cluster);
schema.AddSingleBlockTable("T1", {0, 1, 2});
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
for (int i = 0; i < 3; ++i) scheduler.Compute(&result);
EXPECT_EQ(3, result.NumTotalAssignments());
EXPECT_EQ(3, result.NumDiskAssignments(0));
EXPECT_EQ(0, result.NumDiskAssignments(1));
EXPECT_EQ(0, result.NumDiskAssignments(2));
/// Create a medium sized cluster with 100 nodes and compute a schedule over 3 tables.
TEST_F(SchedulerTest, TestMediumSizedCluster) {
Cluster cluster;
cluster.AddHosts(100, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::LOCAL_ONLY, 3);
schema.AddMultiBlockTable("T2", 5, ReplicaPlacement::LOCAL_ONLY, 3);
schema.AddMultiBlockTable("T3", 1, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(16, result.NumTotalAssignments());
EXPECT_EQ(16, result.NumDiskAssignments());
/// Verify that remote placement and scheduling work as expected.
TEST_F(SchedulerTest, RemoteOnlyPlacement) {
Cluster cluster;
for (int i = 0; i < 100; ++i) cluster.AddHost(i < 30, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::REMOTE_ONLY, 3);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(10, result.NumTotalAssignments());
EXPECT_EQ(10, result.NumRemoteAssignments());
EXPECT_EQ(Block::DEFAULT_BLOCK_SIZE, result.MaxNumAssignedBytesPerHost());
/// Add a table with 1000 scan ranges over 10 hosts and ensure that the right number of
/// assignments is computed.
TEST_F(SchedulerTest, ManyScanRanges) {
Cluster cluster;
cluster.AddHosts(10, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 1000, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(1000, result.NumTotalAssignments());
EXPECT_EQ(1000, result.NumDiskAssignments());
// When distributing 1000 blocks with 1 replica over 10 hosts, the probability for the
// most-picked host to end up with more than 140 blocks is smaller than 1E-3 (Chernoff
// bound). Adding 2 additional replicas per block will make the probability even
// smaller. This test is deterministic, so we expect a failure less often than every 1E3
// changes to the test, not every 1E3 runs.
EXPECT_LE(result.MaxNumAssignmentsPerHost(), 140);
EXPECT_LE(result.MaxNumAssignedBytesPerHost(), 140 * Block::DEFAULT_BLOCK_SIZE);
/// Compute a schedule in a split cluster (disjoint set of backends and datanodes).
TEST_F(SchedulerTest, DisjointClusterWithRemoteReads) {
Cluster cluster;
for (int i = 0; i < 20; ++i) cluster.AddHost(i < 10, i >= 10);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::REMOTE_ONLY, 3);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(10, result.NumTotalAssignments());
EXPECT_EQ(10, result.NumRemoteAssignments());
// Expect that the datanodes were not mistaken for backends.
for (int i = 10; i < 20; ++i) EXPECT_EQ(0, result.NumTotalAssignments(i));
/// Verify that cached replicas take precedence.
TEST_F(SchedulerTest, TestCachedReadPreferred) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddSingleBlockTable("T1", {0, 2}, {1});
Plan plan(schema);
// 1 of the 3 replicas is cached.
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes());
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes(1));
EXPECT_EQ(0, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumRemoteAssignedBytes());
// Compute additional assignments.
for (int i = 0; i < 8; ++i) scheduler.Compute(&result);
EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes());
EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes(1));
EXPECT_EQ(0, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumRemoteAssignedBytes());
/// Verify that disable_cached_reads is effective.
TEST_F(SchedulerTest, TestDisableCachedReads) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddSingleBlockTable("T1", {0, 2}, {1});
Plan plan(schema);
// 1 of the 3 replicas is cached.
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(0, result.NumCachedAssignedBytes());
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumRemoteAssignedBytes());
// Compute additional assignments.
for (int i = 0; i < 8; ++i) scheduler.Compute(&result);
EXPECT_EQ(0, result.NumCachedAssignedBytes());
EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumRemoteAssignedBytes());
/// IMPALA-3019: Test for round robin reset problem. We schedule the same plan twice but
/// send an empty statestored message in between.
/// TODO: This problem cannot occur anymore and the test is merely green for random
/// behavior. Remove.
TEST_F(SchedulerTest, EmptyStatestoreMessage) {
Cluster cluster;
cluster.AddHosts(3, false, true);
cluster.AddHosts(2, true, false);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::RANDOM, 3);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
EXPECT_EQ(0, result.NumTotalAssignedBytes(3));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(4));
EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(3));
EXPECT_EQ(0, result.NumTotalAssignedBytes(4));
/// Test sending updates to the scheduler.
TEST_F(SchedulerTest, TestSendUpdates) {
Cluster cluster;
// 3 hosts, only first two run backends. This allows us to remove one of the backends
// from the scheduler and then verify that reads are assigned to the other backend.
for (int i=0; i < 3; ++i) cluster.AddHost(i < 2, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::RANDOM, 3);
Plan plan(schema);
Result result(plan);
SchedulerWrapper scheduler(plan);
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(0));
EXPECT_EQ(0, result.NumCachedAssignedBytes(0));
EXPECT_EQ(0, result.NumRemoteAssignedBytes(0));
EXPECT_EQ(0, result.NumDiskAssignedBytes(1));
// Remove first host from scheduler.
EXPECT_EQ(0, result.NumDiskAssignedBytes(0));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(1));
// Re-add first host from scheduler.
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(0));
EXPECT_EQ(0, result.NumDiskAssignedBytes(1));
} // end namespace impala
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();