blob: 4005927f8ebedc6c8cd8cf96bad3041141b42bb6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef SCHEDULING_SCHEDULER_TEST_UTIL_H
#define SCHEDULING_SCHEDULER_TEST_UTIL_H
#include <string>
#include <unordered_map>
#include <vector>
#include <boost/scoped_ptr.hpp>
#include "common/status.h"
#include "gen-cpp/ImpalaInternalService.h" // for TQueryOptions
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/scheduler.h"
#include "scheduling/query-schedule.h"
#include "util/metrics.h"
namespace impala {
class ClusterMembershipMgr;
class Scheduler;
class TTopicDelta;
namespace test {
typedef std::string TableName;
/// Helper classes to be used by the scheduler tests.
/// Overall testing approach: Each test builds a list of hosts and a plan, both to which
/// elements can be added using various helper methods. Then scheduling can be tested
/// by instantiating SchedulerWrapper and calling Compute(...). The result can be verified
/// using a set of helper methods. There are also helper methods to modify the internal
/// state of the scheduler between subsequent calls to SchedulerWrapper::Compute().
///
/// The model currently comes with some known limitations:
///
/// - Files map 1:1 to blocks and to scan ranges.
/// - All files have the same size (1 block of 1M). Tables that differ in size can be
/// expressed as having a different number of blocks.
/// - We don't support multiple backends on a single host.
/// - Ports are assigned to hosts automatically and are not configurable by the test.
// TODO: Extend the model to support files with multiple blocks.
// TODO: Test more methods of the scheduler.
// TODO: Add support to add skewed table scans with multiple scan ranges: often there are
// 3 replicas where there may be skew for 1 of the replicas (e.g. after a single
// node insert) but the other 2 are random.
// TODO: Make use of the metrics provided by the scheduler.
// TODO: Add checks for MinNumAssignmentsPerHost() to all tests where applicable.
// TODO: Add post-condition checks that have to hold for all successful scheduler runs.
// TODO: Add possibility to explicitly specify the replica location per file.
// TODO: Add methods to retrieve and verify generated file placements from plan.
// TODO: Extend the model to specify a physical schema independently of a plan (ie,
// tables/files, blocks, replicas and cached replicas exist independently of the
// queries that run against them).
/// File blocks store a list of all datanodes that have a replica of the block. When
/// defining tables you can specify the desired replica placement among all available
/// datanodes in the cluster.
///
/// - RANDOM means that any datanode can be picked.
/// - LOCAL_ONLY means that only datanodes with a backend will be picked.
/// - REMOTE_ONLY means that only datanodes without a backend will be picked.
///
/// Whether replicas will be cached or not is not determined by this value, but by
/// additional function arguments when adding tables to the schema.
enum class ReplicaPlacement {
RANDOM,
LOCAL_ONLY,
REMOTE_ONLY,
};
/// Blocks and FileSplitGeneratorSpecs mimic real files, and for some parts of scheduling
/// (e.g. consistent remote scheduling) the actual file names and partition paths matter.
/// When defining a table, you can specify a block naming policy to control this.
/// - UNPARTITIONED means that the partition paths and partition ids are constant, but
/// the file names are unique.
/// - PARTITIONED_SINGLE_FILENAME means that the partition paths and partition ids are
/// unique, but the file name inside the partition is a single constant.
/// - PARTITIONED_UNIQUE_FILENAMES means that the partition paths, partition ids, and
/// the file names inside the partition.
/// These policies are mostly irrelevent for single block tables or tables with local
/// scheduling, so the default policy is UNPARTITIONED.
enum class BlockNamingPolicy {
UNPARTITIONED,
PARTITIONED_SINGLE_FILENAME,
PARTITIONED_UNIQUE_FILENAMES,
};
std::ostream& operator<<(std::ostream& os, const BlockNamingPolicy& naming_policy);
/// Host model. Each host can have either a backend, a datanode, or both. To specify that
/// a host should not act as a backend or datanode specify '-1' as the respective port.
/// A host with a backend is always a coordinator but it may not be an executor.
struct Host {
Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port, bool is_executor)
: name(name), ip(ip), be_port(be_port), dn_port(dn_port), is_coordinator(true),
is_executor(is_executor) {}
Hostname name;
IpAddr ip;
int be_port; // Backend port
int dn_port; // Datanode port
bool is_coordinator; // True if this is a coordinator host
bool is_executor; // True if this is an executor host
};
/// A cluster stores a list of hosts and provides various methods to add hosts to the
/// cluster. All hosts are guaranteed to have unique IP addresses and hostnames.
class Cluster {
public:
/// Add a host and return the host's index. 'hostname' and 'ip' of the new host will be
/// generated and are guaranteed to be unique.
/// TODO: Refactor the construction of a host and its addition to a cluster to
/// avoid the boolean input parameters.
int AddHost(bool has_backend, bool has_datanode, bool is_executor = true);
/// Add a number of hosts with the same properties by repeatedly calling AddHost(..).
void AddHosts(int num_hosts, bool has_backend, bool has_datanode,
bool is_executor = true);
/// Return the backend address (ip, port) for the host with index 'host_idx'.
void GetBackendAddress(int host_idx, TNetworkAddress* addr) const;
const std::vector<Host>& hosts() const { return hosts_; }
int NumHosts() const { return hosts_.size(); }
/// Helper function to create a cluster with Impala nodes separate from the datanodes.
/// This is primarily used for consistent remote scheduling tests. This places
/// the impalad nodes first, then the data nodes. Impalad nodes will have indices
/// in the range [0, num_impala_nodes-1] and data nodes will have indices in the
/// range [num_impala_nodes, num_impala_nodes+num_data_nodes-1].
static Cluster CreateRemoteCluster(int num_impala_nodes, int num_data_nodes);
/// These methods return lists of host indexes, grouped by their type, which can be used
/// to draw samples of random sets of hosts.
/// TODO: Think of a nicer abstraction to expose this information.
const std::vector<int>& backend_host_idxs() const { return backend_host_idxs_; }
const std::vector<int>& datanode_host_idxs() const { return datanode_host_idxs_; }
const std::vector<int>& datanode_with_backend_host_idxs() const;
const std::vector<int>& datanode_only_host_idxs() const;
private:
/// Port for all backends.
static const int BACKEND_PORT;
/// Port for all datanodes.
static const int DATANODE_PORT;
/// Prefix for all generated hostnames.
static const std::string HOSTNAME_PREFIX;
/// First octet for all generated IP addresses.
static const std::string IP_PREFIX;
/// List of hosts in this cluster.
std::vector<Host> hosts_;
/// Lists of indexes of hosts, grouped by their type. The lists reference hosts in
/// 'hosts_' by index and are used for random sampling.
///
/// All hosts with a backend.
std::vector<int> backend_host_idxs_;
/// All hosts with a datanode.
std::vector<int> datanode_host_idxs_;
/// All hosts with a datanode and a backend.
std::vector<int> datanode_with_backend_host_idxs_;
/// All hosts with a datanode but no backend.
std::vector<int> datanode_only_host_idxs_;
/// Map from IP addresses to host indexes.
std::unordered_map<IpAddr, int> ip_to_idx_;
};
struct Block {
/// By default all blocks are of the same size.
int64_t length = DEFAULT_BLOCK_SIZE;
/// Index into the cluster that owns the table that owns this block.
std::vector<int> replica_host_idxs;
/// Flag for each entry in replica_host_idxs whether it is a cached replica or not.
std::vector<bool> replica_host_idx_is_cached;
/// Default size for new blocks.
static const int64_t DEFAULT_BLOCK_SIZE;
};
struct FileSplitGeneratorSpec {
FileSplitGeneratorSpec() {}
FileSplitGeneratorSpec(int64_t length, int64_t block, bool splittable)
: length(length), block_size(block), is_splittable(splittable) {}
/// Length of file for which to generate file splits.
int64_t length = DEFAULT_FILE_SIZE;
/// Size of each split.
int64_t block_size = DEFAULT_BLOCK_SIZE;
bool is_splittable = true;
static const int64_t DEFAULT_FILE_SIZE;
static const int64_t DEFAULT_BLOCK_SIZE;
};
/// A table models multiple files. Each file can be represented explicitly with a Block
/// or with a FileSplitGeneratorSpecs. A table can consist of files with both
/// representations. The table can specify a BlockNamingPolicy, which tailors the
/// file name and path for scan ranges to simulate partitioned vs unpartitioned tables.
/// Consistent remote scheduling depends on the file paths, but the file names do not
/// impact other aspects of scheduling.
struct Table {
BlockNamingPolicy naming_policy = BlockNamingPolicy::UNPARTITIONED;
std::vector<Block> blocks;
std::vector<FileSplitGeneratorSpec> specs;
};
class Schema {
public:
Schema(const Cluster& cluster) : cluster_(cluster) {}
/// Add a table with no blocks. This is useful for tables with a custom naming
/// policy that later add FileSplitGeneratorSpecs.
void AddEmptyTable(const TableName& table_name, BlockNamingPolicy naming_policy);
/// Add a table consisting of a single block to the schema with explicitly specified
/// replica indexes for non-cached replicas and without any cached replicas. Replica
/// indexes must refer to hosts in cluster_.hosts() by index.
void AddSingleBlockTable(
const TableName& table_name, const std::vector<int>& non_cached_replica_host_idxs);
/// Add a table consisting of a single block to the schema with explicitly specified
/// replica indexes for both non-cached and cached replicas. Values in both lists must
/// refer to hosts in cluster_.hosts() by index. Both lists must be disjoint, i.e., a
/// replica can either be cached or not.
void AddSingleBlockTable(const TableName& table_name,
const std::vector<int>& non_cached_replica_host_idxs,
const std::vector<int>& cached_replica_host_idxs);
/// Add a table to the schema, selecting replica hosts according to the given replica
/// placement preference. All replicas will be non-cached.
void AddMultiBlockTable(const TableName& table_name, int num_blocks,
ReplicaPlacement replica_placement, int num_replicas);
/// Add a table to the schema, selecting replica hosts according to the given replica
/// placement preference. After replica selection has been done, 'num_cached_replicas'
/// of them are marked as cached. The table uses the specified 'naming_policy' for
/// its blocks.
void AddMultiBlockTable(const TableName& table_name, int num_blocks,
ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas,
BlockNamingPolicy naming_policy);
/// Adds FileSplitGeneratorSpecs to table named 'table_name'. If the table does not
/// exist, creates a new table. Otherwise, adds the 'specs' to an existing table.
void AddFileSplitGeneratorSpecs(
const TableName& table_name, const std::vector<FileSplitGeneratorSpec>& specs);
/// Adds 'num' default FileSplitGeneratorSpecs to table named 'table_name'. If the table
/// does not exist, creates a new table. Otherwise, adds the 'specs' to an existing
/// table.
void AddFileSplitGeneratorDefaultSpecs(const TableName& table_name, int num);
const Table& GetTable(const TableName& table_name) const;
const Cluster& cluster() const { return cluster_; }
private:
/// Store a reference to the cluster, from which hosts are sampled. Test results will
/// use the cluster to resolve host indexes to hostnames and IP addresses.
const Cluster& cluster_;
std::unordered_map<TableName, Table> tables_;
};
/// Plan model. A plan contains a list of tables to scan and the query options to be used
/// during scheduling.
class Plan {
public:
Plan(const Schema& schema) : schema_(schema) {}
const TQueryOptions& query_options() const { return query_options_; }
void SetReplicaPreference(TReplicaPreference::type p);
void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
void SetNumRemoteExecutorCandidates(int32_t num);
const Cluster& cluster() const { return schema_.cluster(); }
const std::vector<TNetworkAddress>& referenced_datanodes() const;
const TScanRangeSpec& scan_range_specs() const;
/// Add a scan of table 'table_name' to the plan. This method will populate the internal
/// TScanRangeSpecs and can be called multiple times for the same table to schedule
/// additional scans.
void AddTableScan(const TableName& table_name);
private:
/// Store a reference to the schema, from which scanned tables will be read.
const Schema& schema_;
TQueryOptions query_options_;
/// List of all datanodes that are referenced by this plan. Only hosts that have an
/// assigned scan range are added here.
std::vector<TNetworkAddress> referenced_datanodes_;
/// Map from plan host index to an index in 'referenced_datanodes_'.
std::unordered_map<int, int> host_idx_to_datanode_idx_;
/// Scan range specs that are scheduled by the Scheduler.
TScanRangeSpec scan_range_specs_;
/// Initialize a TScanRangeLocationList object in place.
void BuildTScanRangeLocationList(const TableName& table_name, const Block& block,
int block_idx, BlockNamingPolicy naming_policy,
TScanRangeLocationList* scan_range_locations);
/// Builds appropriate paths for a Block given the table name and block naming
/// policy.
void GetBlockPaths(const TableName& table_name, bool is_spec, int index,
BlockNamingPolicy naming_policy, string* relative_path, int64_t* partition_id,
string* partition_path);
/// Initializes a scan range for a Block.
void BuildScanRange(
const TableName& table_name, const Block& block, int block_idx,
BlockNamingPolicy naming_policy, TScanRange* range);
/// Initializes a scan range for a FileSplitGeneratorSpec.
void BuildScanRangeSpec(const TableName& table_name, const FileSplitGeneratorSpec& spec,
int spec_idx, BlockNamingPolicy naming_policy,
TFileSplitGeneratorSpec* thrift_spec);
/// Look up the plan-local host index of 'cluster_datanode_idx'. If the host has not
/// been added to the plan before, it will add it to 'referenced_datanodes_' and return
/// the new index.
int FindOrInsertDatanodeIndex(int cluster_datanode_idx);
};
class Result {
private:
/// Map to count the number of assignments per backend.
typedef std::unordered_map<IpAddr, int> NumAssignmentsPerBackend;
/// Map to count the number of assigned bytes per backend.
typedef std::unordered_map<IpAddr, int64_t> NumAssignedBytesPerBackend;
/// Parameter type for callbacks, which are used to filter scheduling results.
struct AssignmentInfo {
const TNetworkAddress& addr;
const THdfsFileSplit& hdfs_file_split;
bool is_cached;
bool is_remote;
};
/// These functions are used as callbacks when processing the scheduling result. They
/// will be called once per assigned scan range.
typedef std::function<bool(const AssignmentInfo& assignment)> AssignmentFilter;
typedef std::function<void(const AssignmentInfo& assignment)> AssignmentCallback;
public:
Result(const Plan& plan) : plan_(plan) {}
/// Return the total number of scheduled assignments.
int NumTotalAssignments() const { return CountAssignmentsIf(Any()); }
/// Return the total number of assigned bytes.
int NumTotalAssignedBytes() const { return CountAssignedBytesIf(Any()); }
/// Return the number of scheduled assignments for a single host.
int NumTotalAssignments(int host_idx) const;
/// Return the number of assigned bytes for a single host.
int NumTotalAssignedBytes(int host_idx) const;
/// Return the total number of assigned cached reads.
int NumCachedAssignments() const { return CountAssignmentsIf(IsCached(Any())); }
/// Return the total number of assigned bytes for cached reads.
int NumCachedAssignedBytes() const { return CountAssignedBytesIf(IsCached(Any())); }
/// Return the total number of assigned cached reads for a single host.
int NumCachedAssignments(int host_idx) const;
/// Return the total number of assigned bytes for cached reads for a single host.
int NumCachedAssignedBytes(int host_idx) const;
/// Return the total number of assigned non-cached reads.
int NumDiskAssignments() const { return CountAssignmentsIf(IsDisk(Any())); }
/// Return the total number of assigned bytes for non-cached reads.
int NumDiskAssignedBytes() const { return CountAssignedBytesIf(IsDisk(Any())); }
/// Return the total number of assigned non-cached reads for a single host.
int NumDiskAssignments(int host_idx) const;
/// Return the total number of assigned bytes for non-cached reads for a single host.
int NumDiskAssignedBytes(int host_idx) const;
/// Return the total number of assigned remote reads.
int NumRemoteAssignments() const { return CountAssignmentsIf(IsRemote(Any())); }
/// Return the total number of assigned bytes for remote reads.
int NumRemoteAssignedBytes() const { return CountAssignedBytesIf(IsRemote(Any())); }
/// Return the total number of assigned remote reads for a single host.
int NumRemoteAssignments(int host_idx) const;
/// Return the total number of assigned bytes for remote reads for a single host.
int NumRemoteAssignedBytes(int host_idx) const;
/// Return the maximum number of assigned reads over all hosts.
int MaxNumAssignmentsPerHost() const;
/// Return the maximum number of assigned reads over all hosts.
int64_t MaxNumAssignedBytesPerHost() const;
/// Return the minimum number of assigned reads over all hosts.
/// NOTE: This is computed by traversing all recorded assignments and thus will not
/// consider hosts without any assignments. Hence the minimum value to expect is 1 (not
/// 0).
int MinNumAssignmentsPerHost() const;
/// Return the minimum number of assigned bytes over all hosts.
/// NOTE: This is computed by traversing all recorded assignments and thus will not
/// consider hosts without any assignments. Hence the minimum value to expect is 1 (not
/// 0).
int64_t MinNumAssignedBytesPerHost() const;
/// Return the number of scan range assignments stored in this result.
int NumAssignments() const { return assignments_.size(); }
/// Return the number of distinct backends that have been picked by the scheduler so
/// far.
int NumDistinctBackends() const;
/// Return the full assignment for manual matching.
const FragmentScanRangeAssignment& GetAssignment(int index = 0) const;
/// Add an assignment to the result and return a reference, which can then be passed on
/// to the scheduler.
FragmentScanRangeAssignment* AddAssignment();
/// Reset the result to an empty state.
void Reset() { assignments_.clear(); }
private:
/// Vector to store results of consecutive scheduler runs.
std::vector<FragmentScanRangeAssignment> assignments_;
/// Reference to the plan, needed to look up hosts.
const Plan& plan_;
/// Dummy filter matching any assignment.
AssignmentFilter Any() const;
/// Filter to only match assignments of a particular host.
AssignmentFilter IsHost(int host_idx) const;
/// Filter to only match assignments of cached reads.
AssignmentFilter IsCached(AssignmentFilter filter) const;
/// Filter to only match assignments of non-cached, local disk reads.
AssignmentFilter IsDisk(AssignmentFilter filter) const;
/// Filter to only match assignments of remote reads.
AssignmentFilter IsRemote(AssignmentFilter filter) const;
/// Process all recorded assignments and call the supplied callback on each tuple of IP
/// address and scan_range it iterates over.
void ProcessAssignments(const AssignmentCallback& cb) const;
/// Count all assignments matching the supplied filter callback.
int CountAssignmentsIf(const AssignmentFilter& filter) const;
/// Count all assignments matching the supplied filter callback.
int64_t CountAssignedBytesIf(const AssignmentFilter& filter) const;
/// Create a map containing the number of assigned scan ranges per node.
void CountAssignmentsPerBackend(
NumAssignmentsPerBackend* num_assignments_per_backend) const;
/// Create a map containing the number of assigned bytes per node.
void CountAssignedBytesPerBackend(
NumAssignedBytesPerBackend* num_assignments_per_backend) const;
};
/// This class wraps the Scheduler and provides helper for easier instrumentation
/// during tests.
class SchedulerWrapper {
public:
SchedulerWrapper(const Plan& plan);
/// Call ComputeScanRangeAssignment() with exec_at_coord set to false.
Status Compute(Result* result) { return Compute(false, result); }
/// Call ComputeScanRangeAssignment().
Status Compute(bool exec_at_coord, Result* result);
/// Reset the state of the scheduler by re-creating and initializing it.
void Reset() { InitializeScheduler(); }
/// Methods to modify the internal lists of backends maintained by the scheduler.
/// Add a backend to the scheduler.
void AddBackend(const Host& host);
/// Remove a backend from the scheduler.
void RemoveBackend(const Host& host);
/// Send a full map of the backends to the scheduler instead of deltas.
void SendFullMembershipMap();
/// Send an empty update message to the scheduler.
void SendEmptyUpdate();
private:
const Plan& plan_;
boost::scoped_ptr<ClusterMembershipMgr> cluster_membership_mgr_;
boost::scoped_ptr<Scheduler> scheduler_;
MetricGroup metrics_;
/// Initialize the internal scheduler object. The method uses the 'real' constructor
/// used in the rest of the codebase, in contrast to the one that takes a list of
/// backends, which is only used for testing purposes. This allows us to properly
/// initialize the scheduler and exercise the UpdateMembership() method in tests.
void InitializeScheduler();
/// Add a single host to the given TTopicDelta.
void AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const;
/// Send the given topic delta to the scheduler.
void SendTopicDelta(const TTopicDelta& delta);
};
} // end namespace test
} // end namespace impala
#endif