blob: 870105516f2fd89502a2f0f5c055b0994cbb8500 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <random>
#include "common/logging.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/scheduler.h"
#include "scheduling/scheduler-test-util.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
using namespace impala;
using namespace impala::test;
namespace impala {
class SchedulerTest : public testing::Test {
protected:
SchedulerTest() { srand(0); }
virtual void SetUp() {
RandTestUtil::SeedRng("SCHEDULER_TEST_SEED", &rng_);
}
/// Per-test random number generator. Seeded before every test.
std::mt19937 rng_;
};
static const vector<BlockNamingPolicy> BLOCK_NAMING_POLICIES(
{BlockNamingPolicy::UNPARTITIONED, BlockNamingPolicy::PARTITIONED_SINGLE_FILENAME,
BlockNamingPolicy::PARTITIONED_UNIQUE_FILENAMES});
/// Smoke test to schedule a single table with a single scan range over a single host.
TEST_F(SchedulerTest, SingleHostSingleFile) {
Cluster cluster;
cluster.AddHost(true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 1, ReplicaPlacement::LOCAL_ONLY, 1);
Plan plan(schema);
plan.AddTableScan("T");
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(1, result.NumTotalAssignments());
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
EXPECT_EQ(1, result.NumTotalAssignments(0));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
EXPECT_EQ(0, result.NumCachedAssignments());
}
/// Test cluster configuration with one coordinator that can't process scan ranges.
TEST_F(SchedulerTest, SingleCoordinatorNoExecutor) {
Cluster cluster;
cluster.AddHost(true, true, false);
cluster.AddHost(true, true, true);
cluster.AddHost(true, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
plan.AddTableScan("T1");
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(2, result.NumDistinctBackends());
EXPECT_EQ(0, result.NumDiskAssignments(0));
}
/// Test assigning all scan ranges to the coordinator.
TEST_F(SchedulerTest, ExecAtCoord) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 3, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
plan.AddTableScan("T");
Result result(plan);
SchedulerWrapper scheduler(plan);
bool exec_at_coord = true;
ASSERT_OK(scheduler.Compute(exec_at_coord, &result));
EXPECT_EQ(3 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
}
/// Test scanning a simple table twice.
TEST_F(SchedulerTest, ScanTableTwice) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 2, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
plan.AddTableScan("T");
plan.AddTableScan("T");
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(4 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
EXPECT_EQ(4 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumCachedAssignedBytes());
}
/// TODO: This test can be removed once we have the non-random backend round-robin by
/// rank.
/// Schedule randomly over 3 backends and ensure that each backend is at least used once.
TEST_F(SchedulerTest, LocalReadRandomReplica) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddSingleBlockTable("T1", {0, 1, 2});
Plan plan(schema);
plan.AddTableScan("T1");
plan.SetRandomReplica(true);
Result result(plan);
SchedulerWrapper scheduler(plan);
for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result));
ASSERT_EQ(100, result.NumAssignments());
EXPECT_EQ(100, result.NumTotalAssignments());
EXPECT_EQ(100 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
EXPECT_EQ(3, result.NumDistinctBackends());
EXPECT_GE(result.MinNumAssignedBytesPerHost(), Block::DEFAULT_BLOCK_SIZE);
}
/// Distribute a table over the first 3 nodes in the cluster and verify that repeated
/// schedules always pick the first replica (random_replica = false).
TEST_F(SchedulerTest, LocalReadsPickFirstReplica) {
Cluster cluster;
for (int i = 0; i < 10; ++i) cluster.AddHost(i < 5, true);
Schema schema(cluster);
schema.AddSingleBlockTable("T1", {0, 1, 2});
Plan plan(schema);
plan.AddTableScan("T1");
plan.SetRandomReplica(false);
Result result(plan);
SchedulerWrapper scheduler(plan);
for (int i = 0; i < 3; ++i) ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(3, result.NumTotalAssignments());
EXPECT_EQ(3, result.NumDiskAssignments(0));
EXPECT_EQ(0, result.NumDiskAssignments(1));
EXPECT_EQ(0, result.NumDiskAssignments(2));
}
/// Create a medium sized cluster with 100 nodes and compute a schedule over 3 tables.
TEST_F(SchedulerTest, TestMediumSizedCluster) {
Cluster cluster;
cluster.AddHosts(100, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::LOCAL_ONLY, 3);
schema.AddMultiBlockTable("T2", 5, ReplicaPlacement::LOCAL_ONLY, 3);
schema.AddMultiBlockTable("T3", 1, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
plan.AddTableScan("T1");
plan.AddTableScan("T2");
plan.AddTableScan("T3");
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(16, result.NumTotalAssignments());
EXPECT_EQ(16, result.NumDiskAssignments());
}
/// Verify that remote placement and scheduling work as expected when
/// num_remote_executor_candidates=0. (i.e. that it is random and covers all nodes).
TEST_F(SchedulerTest, NoRemoteExecutorCandidates) {
Cluster cluster;
for (int i = 0; i < 100; ++i) cluster.AddHost(i < 30, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::REMOTE_ONLY, 3);
Plan plan(schema);
plan.AddTableScan("T1");
plan.SetNumRemoteExecutorCandidates(0);
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(10, result.NumTotalAssignments());
EXPECT_EQ(10, result.NumRemoteAssignments());
EXPECT_EQ(Block::DEFAULT_BLOCK_SIZE, result.MaxNumAssignedBytesPerHost());
}
/// Tests scheduling with num_remote_executor_candidates > 0. Specifically, it verifies
/// that repeated scheduling of a block happens on the appropriate number of distinct
/// nodes for varying values of num_remote_executor_candidates. This includes cases
/// where the num_remote_executor_candidates exceeds the number of Impala executors.
TEST_F(SchedulerTest, RemoteExecutorCandidates) {
int num_data_nodes = 3;
int num_impala_nodes = 5;
Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes);
Schema schema(cluster);
// CreateRemoteCluster places the Impala nodes first, so the data nodes have indices
// of 5, 6, and 7.
schema.AddSingleBlockTable("T1", {5, 6, 7});
// Test a range of number of remote executor candidates with both true and false for
// schedule_random replica. This includes cases where the number of remote executor
// candidates exceeds the number of Impala nodes.
for (bool schedule_random_replica : {true, false}) {
for (int num_candidates = 1; num_candidates <= num_impala_nodes + 2;
++num_candidates) {
Plan plan(schema);
plan.AddTableScan("T1");
plan.SetRandomReplica(schedule_random_replica);
plan.SetNumRemoteExecutorCandidates(num_candidates);
Result result(plan);
SchedulerWrapper scheduler(plan);
for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result));
ASSERT_EQ(100, result.NumAssignments());
EXPECT_EQ(100, result.NumTotalAssignments());
EXPECT_EQ(100 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
if (schedule_random_replica && num_candidates > 1) {
if (num_candidates < num_impala_nodes) {
EXPECT_EQ(num_candidates, result.NumDistinctBackends());
} else {
// Since this scenario still uses the consistent placement algorithm, there is
// no guarantee that the scan range will be placed on all the nodes. But
// it should be placed on almost all of the nodes.
EXPECT_GE(result.NumDistinctBackends(), num_impala_nodes - 1);
}
EXPECT_GE(result.MinNumAssignedBytesPerHost(), Block::DEFAULT_BLOCK_SIZE);
} else {
// If schedule_random_replica is false, then the scheduler will pick the first
// candidate (as none of the backends have any assignments). This means that all
// the iterations will assign work to the same backend. This is also true when
// the number of remote executor candidates is one.
EXPECT_EQ(result.NumDistinctBackends(), 1);
EXPECT_EQ(result.MinNumAssignedBytesPerHost(), 100 * Block::DEFAULT_BLOCK_SIZE);
}
}
}
}
/// Helper function to verify that two things are treated as distinct for consistent
/// remote placement. The input 'schema' should be created with a Cluster initialized
/// by Cluster::CreateRemoteCluster() with 50 impalads and 3 data nodes. It should
/// have a single table named "T" that contains two schedulable entities (blocks, specs,
/// etc) with size Block::DEFAULT_BLOCK_SIZE that are expected to be distinct. It runs
/// the scheduler 100 times with random replica set to true and verifies that the number
/// of distinct backends is in the right range. If two things are distinct and each can
/// be scheduled on up to 'num_candidates' distinct backends, then the number of distinct
/// backends should be in the range ['num_candidates' + 1, 2 * 'num_candidates'].
/// The probability of completely overlapping by chance is extremely low and
/// SchedulerTests use srand(0) to be deterministic, so this test should only fail if
/// the entities are no longer considered distinct.
void RemotePlacementVerifyDistinct(const Schema& schema, int num_candidates) {
ASSERT_EQ(schema.cluster().NumHosts(), 53);
Plan plan(schema);
plan.AddTableScan("T");
plan.SetRandomReplica(true);
plan.SetNumRemoteExecutorCandidates(num_candidates);
Result result(plan);
SchedulerWrapper scheduler(plan);
for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result));
// This is not intended to be used with a larger number of candidates
ASSERT_LE(num_candidates, 10);
int min_distinct_backends = num_candidates + 1;
int max_distinct_backends = 2 * num_candidates;
ASSERT_EQ(100, result.NumAssignments());
EXPECT_EQ(200, result.NumTotalAssignments());
EXPECT_EQ(200 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
EXPECT_GE(result.NumDistinctBackends(), min_distinct_backends);
EXPECT_LE(result.NumDistinctBackends(), max_distinct_backends);
}
/// Test that consistent remote placement schedules distinct blocks differently
TEST_F(SchedulerTest, RemotePlacementBlocksDistinct) {
int num_data_nodes = 3;
int num_impala_nodes = 50;
Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes);
// Two blocks (which translate to actual files for a table) should hash differently
// and result in different remote placement. This verifies various combinations
// corresponding to how files are named.
for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) {
SCOPED_TRACE(naming_policy);
Schema schema(cluster);
int num_blocks = 2;
int num_remote_candidates = 3;
schema.AddMultiBlockTable("T", num_blocks, ReplicaPlacement::RANDOM,
num_remote_candidates, 0, naming_policy);
RemotePlacementVerifyDistinct(schema, num_remote_candidates);
}
}
/// Test that consistent remote placement schedules distinct file split generator specs
/// differently
TEST_F(SchedulerTest, RemotePlacementSpecsDistinct) {
int num_data_nodes = 3;
int num_impala_nodes = 50;
Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes);
// Two specs (which translate to actual files for a table) should hash differently
// and result in different remote placement. This verifies that is true for all
// the naming policies.
for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) {
SCOPED_TRACE(naming_policy);
Schema schema(cluster);
int num_remote_candidates = 3;
// Add the table with the appropriate naming policy, but without adding any blocks.
schema.AddEmptyTable("T", naming_policy);
// Add two splits with one block each (i.e. the total size of the split is the
// block size).
schema.AddFileSplitGeneratorSpecs("T",
{{Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, false},
{Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, false}});
RemotePlacementVerifyDistinct(schema, num_remote_candidates);
}
}
/// Tests that consistent remote placement schedules blocks with distinct offsets
/// differently
TEST_F(SchedulerTest, RemotePlacementOffsetsDistinct) {
int num_data_nodes = 3;
int num_impala_nodes = 50;
Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes);
// A FileSplitGeneratorSpec that generates two scan ranges should hash the two scan
// ranges differently due to different offsets. This is true regardless of the
// naming_policy (which should not impact the outcome).
for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) {
SCOPED_TRACE(naming_policy);
Schema schema(cluster);
int num_remote_candidates = 3;
// Add the table with the appropriate naming policy, but without adding any blocks.
schema.AddEmptyTable("T", naming_policy);
// Add a splittable spec that will be split into two scan ranges each of
// the default block size.
schema.AddFileSplitGeneratorSpecs("T",
{{2 * Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, true}});
RemotePlacementVerifyDistinct(schema, num_remote_candidates);
}
}
/// Verify basic consistency of remote executor candidates. Specifically, it schedules
/// a set of blocks, then removes some executors that did not have any blocks assigned to
/// them, and verifies that rerunning the scheduling results in the same assignments.
TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) {
int num_data_nodes = 3;
int num_impala_nodes = 50;
Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes);
// Replica placement is unimportant for this test. All blocks will be on
// all datanodes, but Impala is runnning remotely.
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 25, ReplicaPlacement::RANDOM, 3);
Plan plan(schema);
plan.AddTableScan("T1");
plan.SetRandomReplica(false);
plan.SetNumRemoteExecutorCandidates(3);
Result result_base(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result_base));
EXPECT_EQ(25, result_base.NumTotalAssignments());
EXPECT_EQ(25 * Block::DEFAULT_BLOCK_SIZE, result_base.NumTotalAssignedBytes());
EXPECT_GT(result_base.NumDistinctBackends(), 3);
// There are 25 blocks and 50 Impala hosts. There will be at least 25 Impala hosts
// without any assigned bytes. Removing some of them should not change the outcome.
// Generate a list of the hosts without bytes assigned.
vector<int> zerobyte_indices;
for (int i = 0; i < num_impala_nodes; ++i) {
if (result_base.NumTotalAssignedBytes(i) == 0) {
zerobyte_indices.push_back(i);
}
}
EXPECT_GE(zerobyte_indices.size(), 25);
// Remove 5 nodes with zero bytes by picking several indices in the list of
// nodes with zero bytes and removing the corresponding backends.
vector<int> zerobyte_indices_to_remove({3, 7, 12, 15, 19});
int num_removed = 0;
for (int index_to_remove : zerobyte_indices_to_remove) {
int node_index = zerobyte_indices[index_to_remove];
scheduler.RemoveBackend(cluster.hosts()[node_index]);
num_removed++;
}
ASSERT_EQ(num_removed, 5);
// Rerun the scheduling with the nodes removed.
Result result_empty_removed(plan);
ASSERT_OK(scheduler.Compute(&result_empty_removed));
EXPECT_EQ(25, result_empty_removed.NumTotalAssignments());
EXPECT_EQ(25 * Block::DEFAULT_BLOCK_SIZE, result_empty_removed.NumTotalAssignedBytes());
// Verify that the outcome is identical.
for (int i = 0; i < num_impala_nodes; ++i) {
EXPECT_EQ(result_base.NumRemoteAssignedBytes(i),
result_empty_removed.NumRemoteAssignedBytes(i))
<< "Mismatch at index " << std::to_string(i);
}
}
/// Add a table with 1000 scan ranges over 10 hosts and ensure that the right number of
/// assignments is computed.
TEST_F(SchedulerTest, ManyScanRanges) {
Cluster cluster;
cluster.AddHosts(10, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 1000, ReplicaPlacement::LOCAL_ONLY, 3);
Plan plan(schema);
plan.AddTableScan("T");
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(1000, result.NumTotalAssignments());
EXPECT_EQ(1000, result.NumDiskAssignments());
// When distributing 1000 blocks with 1 replica over 10 hosts, the probability for the
// most-picked host to end up with more than 140 blocks is smaller than 1E-3 (Chernoff
// bound). Adding 2 additional replicas per block will make the probability even
// smaller. This test is deterministic, so we expect a failure less often than every 1E3
// changes to the test, not every 1E3 runs.
EXPECT_LE(result.MaxNumAssignmentsPerHost(), 140);
EXPECT_LE(result.MaxNumAssignedBytesPerHost(), 140 * Block::DEFAULT_BLOCK_SIZE);
}
/// Compute a schedule in a split cluster (disjoint set of backends and datanodes).
TEST_F(SchedulerTest, DisjointClusterWithRemoteReads) {
Cluster cluster;
for (int i = 0; i < 20; ++i) cluster.AddHost(i < 10, i >= 10);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::REMOTE_ONLY, 3);
Plan plan(schema);
plan.AddTableScan("T1");
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(10, result.NumTotalAssignments());
EXPECT_EQ(10, result.NumRemoteAssignments());
// Expect that the datanodes were not mistaken for backends.
for (int i = 10; i < 20; ++i) EXPECT_EQ(0, result.NumTotalAssignments(i));
}
/// Verify that cached replicas take precedence.
TEST_F(SchedulerTest, TestCachedReadPreferred) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddSingleBlockTable("T1", {0, 2}, {1});
Plan plan(schema);
// 1 of the 3 replicas is cached.
plan.AddTableScan("T1");
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes());
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes(1));
EXPECT_EQ(0, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumRemoteAssignedBytes());
// Compute additional assignments.
for (int i = 0; i < 8; ++i) ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes());
EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes(1));
EXPECT_EQ(0, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumRemoteAssignedBytes());
}
/// Test sending updates to the scheduler.
TEST_F(SchedulerTest, TestSendUpdates) {
Cluster cluster;
// 3 hosts, only last two run backends. This allows us to remove one of the backends
// from the scheduler and then verify that reads are assigned to the other backend.
for (int i = 0; i < 3; ++i) cluster.AddHost(i > 0, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::REMOTE_ONLY, 1);
Plan plan(schema);
plan.AddTableScan("T1");
// Test only applies when num_remote_executor_candidates=0.
plan.SetNumRemoteExecutorCandidates(0);
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
// Two backends are registered, so the scheduler will pick a random one.
EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(2));
// Remove one host from scheduler.
int test_host = 2;
scheduler.RemoveBackend(cluster.hosts()[test_host]);
result.Reset();
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
// Re-add host to scheduler.
scheduler.AddBackend(cluster.hosts()[test_host]);
result.Reset();
ASSERT_OK(scheduler.Compute(&result));
// Two backends are registered, so the scheduler will pick a random one.
EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
// Remove the other host from the scheduler.
test_host = 1;
scheduler.RemoveBackend(cluster.hosts()[test_host]);
result.Reset();
ASSERT_OK(scheduler.Compute(&result));
// Only one backend remains so the scheduler must pick it.
EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(2));
}
TEST_F(SchedulerTest, TestGeneratedSingleSplit) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddFileSplitGeneratorDefaultSpecs("T", 1);
Plan plan(schema);
plan.AddTableScan("T");
plan.SetRandomReplica(true);
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(FileSplitGeneratorSpec::DEFAULT_FILE_SIZE
/ FileSplitGeneratorSpec::DEFAULT_BLOCK_SIZE,
result.NumTotalAssignments());
EXPECT_EQ(
1 * FileSplitGeneratorSpec::DEFAULT_FILE_SIZE, result.NumTotalAssignedBytes());
}
TEST_F(SchedulerTest, TestGeneratedMultiSplit) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddFileSplitGeneratorDefaultSpecs("T", 100);
Plan plan(schema);
plan.AddTableScan("T");
plan.SetRandomReplica(true);
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(100 * FileSplitGeneratorSpec::DEFAULT_FILE_SIZE
/ FileSplitGeneratorSpec::DEFAULT_BLOCK_SIZE,
result.NumTotalAssignments());
EXPECT_EQ(
100 * FileSplitGeneratorSpec::DEFAULT_FILE_SIZE, result.NumTotalAssignedBytes());
}
TEST_F(SchedulerTest, TestGeneratedVariableSizeSplit) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddFileSplitGeneratorSpecs(
"T", {{100, 100, true}, {100, 1, false}, {100, 10, true}});
Plan plan(schema);
plan.AddTableScan("T");
plan.SetRandomReplica(true);
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(12, result.NumTotalAssignments());
EXPECT_EQ(300, result.NumTotalAssignedBytes());
}
TEST_F(SchedulerTest, TestBlockAndGenerateSplit) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 2, ReplicaPlacement::LOCAL_ONLY, 3);
schema.AddFileSplitGeneratorSpecs(
"T", {{100, 100, true}, {100, 1, false}, {100, 10, true}});
Plan plan(schema);
plan.AddTableScan("T");
Result result(plan);
SchedulerWrapper scheduler(plan);
ASSERT_OK(scheduler.Compute(&result));
EXPECT_EQ(14, result.NumTotalAssignments());
EXPECT_EQ(300 + 2 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
EXPECT_EQ(2 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
EXPECT_EQ(0, result.NumCachedAssignedBytes());
}
/// Test scheduling fails with no backends (the local backend gets registered with the
/// scheduler but is not marked as an executor).
TEST_F(SchedulerTest, TestEmptyBackendConfig) {
Cluster cluster;
cluster.AddHost(false, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 1, ReplicaPlacement::REMOTE_ONLY, 1);
Plan plan(schema);
plan.AddTableScan("T");
Result result(plan);
SchedulerWrapper scheduler(plan);
Status status = scheduler.Compute(&result);
EXPECT_TRUE(!status.ok());
}
/// IMPALA-4494: Test scheduling with no backends but exec_at_coord.
TEST_F(SchedulerTest, TestExecAtCoordWithEmptyBackendConfig) {
Cluster cluster;
cluster.AddHost(false, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 1, ReplicaPlacement::REMOTE_ONLY, 1);
Plan plan(schema);
plan.AddTableScan("T");
Result result(plan);
SchedulerWrapper scheduler(plan);
bool exec_at_coord = true;
Status status = scheduler.Compute(exec_at_coord, &result);
EXPECT_TRUE(status.ok());
}
/// IMPALA-4494: Test exec_at_coord while local backend is not registered with itself.
TEST_F(SchedulerTest, TestExecAtCoordWithoutLocalBackend) {
Cluster cluster;
cluster.AddHosts(3, true, true);
Schema schema(cluster);
schema.AddMultiBlockTable("T", 1, ReplicaPlacement::LOCAL_ONLY, 1);
Plan plan(schema);
plan.AddTableScan("T");
Result result(plan);
SchedulerWrapper scheduler(plan);
// Remove first host from scheduler. By convention this is the coordinator. The
// scheduler will ignore this and successfully assign the scan.
scheduler.RemoveBackend(cluster.hosts()[0]);
bool exec_at_coord = true;
Status status = scheduler.Compute(exec_at_coord, &result);
EXPECT_TRUE(status.ok());
}
// Test scheduling algorithm for load-balancing scan ranges within a host.
// This exercises the provide AssignRangesToInstances() method that implements the core
// of the algorithm.
TEST_F(SchedulerTest, TestMultipleFinstances) {
const int NUM_RANGES = 16;
std::vector<TScanRangeParams> fs_ranges(NUM_RANGES);
std::vector<TScanRangeParams> kudu_ranges(NUM_RANGES);
// Create ranges with lengths 1, 2, ..., etc.
for (int i = 0; i < NUM_RANGES; ++i) {
fs_ranges[i].scan_range.__set_hdfs_file_split(THdfsFileSplit());
fs_ranges[i].scan_range.hdfs_file_split.length = i + 1;
kudu_ranges[i].scan_range.__set_kudu_scan_token("fake token");
}
// Test handling of the single instance case - all ranges go to the same instance.
vector<vector<TScanRangeParams>> fs_one_instance =
Scheduler::AssignRangesToInstances(1, &fs_ranges);
ASSERT_EQ(1, fs_one_instance.size());
EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size());
vector<vector<TScanRangeParams>> kudu_one_instance =
Scheduler::AssignRangesToInstances(1, &kudu_ranges);
ASSERT_EQ(1, kudu_one_instance.size());
EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size());
// Ensure that each executor gets one range regardless of input order.
for (int attempt = 0; attempt < 20; ++attempt) {
std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
vector<vector<TScanRangeParams>> range_per_instance =
Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges);
EXPECT_EQ(NUM_RANGES, range_per_instance.size());
// Confirm each range is present and each instance got exactly one range.
vector<int> range_length_count(NUM_RANGES);
for (const auto& instance_ranges : range_per_instance) {
ASSERT_EQ(1, instance_ranges.size());
++range_length_count[instance_ranges[0].scan_range.hdfs_file_split.length - 1];
}
for (int i = 0; i < NUM_RANGES; ++i) {
EXPECT_EQ(1, range_length_count[i]) << i;
}
}
// Test load balancing FS ranges across 4 instances. We should get an even assignment
// across the instances regardless of input order.
for (int attempt = 0; attempt < 20; ++attempt) {
std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
vector<vector<TScanRangeParams>> range_per_instance =
Scheduler::AssignRangesToInstances(4, &fs_ranges);
EXPECT_EQ(4, range_per_instance.size());
// Ensure we got a range of each length in the output.
vector<int> range_length_count(NUM_RANGES);
for (const auto& instance_ranges : range_per_instance) {
EXPECT_EQ(4, instance_ranges.size());
int64_t instance_bytes = 0;
for (const auto& range : instance_ranges) {
instance_bytes += range.scan_range.hdfs_file_split.length;
++range_length_count[range.scan_range.hdfs_file_split.length - 1];
}
// Expect each instance to get sum([1, 2, ..., 16]) / 4 bytes when things are
// distributed evenly.
EXPECT_EQ(34, instance_bytes);
}
for (int i = 0; i < NUM_RANGES; ++i) {
EXPECT_EQ(1, range_length_count[i]) << i;
}
}
// Test load balancing Kudu ranges across 4 instances. We should get an even assignment
// across the instances regardless of input order. We don't know the size of each Kudu
// range, so we just need to check the # of ranges.
for (int attempt = 0; attempt < 20; ++attempt) {
std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_);
vector<vector<TScanRangeParams>> range_per_instance =
Scheduler::AssignRangesToInstances(4, &kudu_ranges);
EXPECT_EQ(4, range_per_instance.size());
for (const auto& instance_ranges : range_per_instance) {
EXPECT_EQ(4, instance_ranges.size());
for (const auto& range : instance_ranges) {
EXPECT_TRUE(range.scan_range.__isset.kudu_scan_token);
}
}
}
}
} // end namespace impala