IMPALA-9015: improve mt_dop scan scheduling

Implement longest-processing time algorithm for assigning scan ranges
to instances within a host. This is a standard algorithm that works
well in practice and solves some specific bugs in the current
algorithm.

The previous approach tended to assign multiple ranges to the
first instance and induce skew. E.g. if the ranges were
[3, 4, 5, 6] and it had 4 instances, it would assign
[3, 4], [5], [6], []. This also had the unfortunate consequence
that not all instances actually got allocated scan ranges,
making scheduling hard to reason about.

Testing:
Added a unit test for the core algorithm that checks directly
that it fixes that above problems.

Perf:
The algorithm is O(n log n) instead of O(n), where n is the
number of scan ranges allocated to a backend. This seems
worthwhile to get more even work distribution because
the payoff can be significant in may cases.

I ran a single node perf run of TPC-H scale 30 with
mt_dop=4 and saw significant perf improvements:

Report Generated on 2019-10-07
Run Description: "ccd741856d0e18be9c89087da71d9fc59f1c75ad vs 37d3df99e588b0e9080660c472c3bb06a08361ac"

Cluster Name: UNKNOWN
Lab Run Info: UNKNOWN
Impala Version:          impalad version 3.4.0-SNAPSHOT RELEASE ()
Baseline Impala Version: impalad version 3.4.0-SNAPSHOT RELEASE ()

+----------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
+----------+-----------------------+---------+------------+------------+----------------+
| TPCH(30) | parquet / none / none | 8.51    | -3.78%     | 5.70       | -4.07%         |
+----------+-----------------------+---------+------------+------------+----------------+

+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+--------+
| Workload | Query    | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval   |
+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+--------+
| TPCH(30) | TPCH-Q11 | parquet / none / none | 3.08   | 3.04        |   +1.11%   |   0.72%   |   1.09%        | 5     |   +1.52%       | 0.58    | 1.88   |
| TPCH(30) | TPCH-Q2  | parquet / none / none | 1.83   | 1.83        |   -0.01%   |   3.68%   |   1.25%        | 5     |   +2.00%       | 0.00    | -0.00  |
| TPCH(30) | TPCH-Q9  | parquet / none / none | 36.29  | 36.17       |   +0.32%   |   2.22%   |   2.94%        | 5     |   +0.56%       | 0.00    | 0.20   |
| TPCH(30) | TPCH-Q8  | parquet / none / none | 5.80   | 5.93        |   -2.06%   |   0.61%   |   0.94%        | 5     |   -1.74%       | -2.02   | -4.14  |
| TPCH(30) | TPCH-Q3  | parquet / none / none | 8.91   | 9.15        |   -2.62%   |   3.54%   |   3.96%        | 5     |   -2.17%       | -0.87   | -1.12  |
| TPCH(30) | TPCH-Q15 | parquet / none / none | 3.38   | 3.46        |   -2.30%   |   0.82%   |   0.68%        | 5     |   -2.54%       | -2.31   | -4.89  |
| TPCH(30) | TPCH-Q6  | parquet / none / none | 1.62   | 1.67        |   -2.73%   |   2.52%   |   0.12%        | 5     |   -2.88%       | -1.15   | -2.49  |
| TPCH(30) | TPCH-Q20 | parquet / none / none | 3.67   | 3.78        |   -3.06%   |   1.12%   |   1.69%        | 5     |   -2.69%       | -2.31   | -3.41  |
| TPCH(30) | TPCH-Q4  | parquet / none / none | 3.08   | 3.18        |   -3.05%   |   1.22%   |   0.32%        | 5     |   -2.86%       | -2.31   | -5.57  |
| TPCH(30) | TPCH-Q14 | parquet / none / none | 5.52   | 5.72        |   -3.56%   |   1.35%   |   1.02%        | 5     |   -3.60%       | -2.31   | -4.81  |
| TPCH(30) | TPCH-Q16 | parquet / none / none | 2.32   | 2.41        |   -3.86%   |   1.04%   |   1.64%        | 5     |   -3.99%       | -2.02   | -4.50  |
| TPCH(30) | TPCH-Q7  | parquet / none / none | 24.22  | 25.27       |   -4.15%   |   0.40%   |   2.01%        | 5     |   -3.74%       | -2.31   | -4.55  |
| TPCH(30) | TPCH-Q18 | parquet / none / none | 8.64   | 8.99        |   -3.97%   |   1.02%   |   0.47%        | 5     |   -4.27%       | -2.31   | -8.15  |
| TPCH(30) | TPCH-Q12 | parquet / none / none | 3.38   | 3.53        |   -4.28%   |   1.67%   |   1.39%        | 5     |   -4.26%       | -2.31   | -4.51  |
| TPCH(30) | TPCH-Q5  | parquet / none / none | 11.77  | 12.28       |   -4.18%   |   0.72%   |   0.94%        | 5     |   -4.62%       | -2.31   | -8.06  |
| TPCH(30) | TPCH-Q17 | parquet / none / none | 7.65   | 8.05        |   -4.98%   |   1.03%   |   2.09%        | 5     |   -4.68%       | -2.31   | -4.82  |
| TPCH(30) | TPCH-Q21 | parquet / none / none | 27.97  | 29.52       | I -5.27%   |   0.78%   |   0.39%        | 5     | I -5.16%       | -2.31   | -14.17 |
| TPCH(30) | TPCH-Q1  | parquet / none / none | 4.92   | 5.24        | I -6.05%   |   3.59%   |   2.32%        | 5     | I -6.79%       | -1.73   | -3.30  |
| TPCH(30) | TPCH-Q19 | parquet / none / none | 4.56   | 4.92        | I -7.33%   |   1.55%   |   0.90%        | 5     | I -8.50%       | -2.31   | -9.64  |
| TPCH(30) | TPCH-Q22 | parquet / none / none | 2.09   | 2.28        | I -8.40%   |   1.95%   |   1.85%        | 5     | I -9.21%       | -2.31   | -7.29  |
| TPCH(30) | TPCH-Q13 | parquet / none / none | 10.83  | 11.84       | I -8.55%   |   0.53%   |   0.57%        | 5     | I -9.34%       | -2.31   | -25.56 |
| TPCH(30) | TPCH-Q10 | parquet / none / none | 5.63   | 6.22        | I -9.53%   |   0.75%   |   0.73%        | 5     | I -10.46%      | -2.31   | -21.37 |
+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+--------+

Change-Id: I45ed2dab835efeb64bb74891cb43065894892682
Reviewed-on: http://gerrit.cloudera.org:8080/14381
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index 1347457..8701055 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -15,11 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
+#include <random>
+
 #include "common/logging.h"
 #include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/scheduler.h"
 #include "scheduling/scheduler-test-util.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 
 using namespace impala;
 using namespace impala::test;
@@ -29,6 +33,13 @@
 class SchedulerTest : public testing::Test {
  protected:
   SchedulerTest() { srand(0); }
+
+  virtual void SetUp() {
+    RandTestUtil::SeedRng("SCHEDULER_TEST_SEED", &rng_);
+  }
+
+  /// Per-test random number generator. Seeded before every test.
+  std::mt19937 rng_;
 };
 
 static const vector<BlockNamingPolicy> BLOCK_NAMING_POLICIES(
@@ -711,4 +722,87 @@
   EXPECT_TRUE(status.ok());
 }
 
+// Test scheduling algorithm for load-balancing scan ranges within a host.
+// This exercises the provide AssignRangesToInstances() method that implements the core
+// of the algorithm.
+TEST_F(SchedulerTest, TestMultipleFinstances) {
+  const int NUM_RANGES = 16;
+  std::vector<TScanRangeParams> fs_ranges(NUM_RANGES);
+  std::vector<TScanRangeParams> kudu_ranges(NUM_RANGES);
+  // Create ranges with lengths 1, 2, ..., etc.
+  for (int i = 0; i < NUM_RANGES; ++i) {
+    fs_ranges[i].scan_range.__set_hdfs_file_split(THdfsFileSplit());
+    fs_ranges[i].scan_range.hdfs_file_split.length = i + 1;
+    kudu_ranges[i].scan_range.__set_kudu_scan_token("fake token");
+  }
+
+  // Test handling of the single instance case - all ranges go to the same instance.
+  vector<vector<TScanRangeParams>> fs_one_instance =
+      Scheduler::AssignRangesToInstances(1, &fs_ranges);
+  ASSERT_EQ(1, fs_one_instance.size());
+  EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size());
+  vector<vector<TScanRangeParams>> kudu_one_instance =
+    Scheduler::AssignRangesToInstances(1, &kudu_ranges);
+  ASSERT_EQ(1, kudu_one_instance.size());
+  EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size());
+
+  // Ensure that each executor gets one range regardless of input order.
+  for (int attempt = 0; attempt < 20; ++attempt) {
+    std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
+    vector<vector<TScanRangeParams>> range_per_instance =
+      Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges);
+    EXPECT_EQ(NUM_RANGES, range_per_instance.size());
+    // Confirm each range is present and each instance got exactly one range.
+    vector<int> range_length_count(NUM_RANGES);
+    for (const auto& instance_ranges : range_per_instance) {
+      ASSERT_EQ(1, instance_ranges.size());
+      ++range_length_count[instance_ranges[0].scan_range.hdfs_file_split.length - 1];
+    }
+    for (int i = 0; i < NUM_RANGES; ++i) {
+      EXPECT_EQ(1, range_length_count[i]) << i;
+    }
+  }
+
+  // Test load balancing FS ranges across 4 instances. We should get an even assignment
+  // across the instances regardless of input order.
+  for (int attempt = 0; attempt < 20; ++attempt) {
+    std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
+    vector<vector<TScanRangeParams>> range_per_instance =
+        Scheduler::AssignRangesToInstances(4, &fs_ranges);
+    EXPECT_EQ(4, range_per_instance.size());
+    // Ensure we got a range of each length in the output.
+    vector<int> range_length_count(NUM_RANGES);
+    for (const auto& instance_ranges : range_per_instance) {
+      EXPECT_EQ(4, instance_ranges.size());
+      int64_t instance_bytes = 0;
+      for (const auto& range : instance_ranges) {
+        instance_bytes += range.scan_range.hdfs_file_split.length;
+        ++range_length_count[range.scan_range.hdfs_file_split.length - 1];
+      }
+      // Expect each instance to get sum([1, 2, ..., 16]) / 4 bytes when things are
+      // distributed evenly.
+      EXPECT_EQ(34, instance_bytes);
+    }
+    for (int i = 0; i < NUM_RANGES; ++i) {
+      EXPECT_EQ(1, range_length_count[i]) << i;
+    }
+  }
+
+  // Test load balancing Kudu ranges across 4 instances. We should get an even assignment
+  // across the instances regardless of input order. We don't know the size of each Kudu
+  // range, so we just need to check the # of ranges.
+  for (int attempt = 0; attempt < 20; ++attempt) {
+    std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_);
+    vector<vector<TScanRangeParams>> range_per_instance =
+        Scheduler::AssignRangesToInstances(4, &kudu_ranges);
+    EXPECT_EQ(4, range_per_instance.size());
+    for (const auto& instance_ranges : range_per_instance) {
+      EXPECT_EQ(4, instance_ranges.size());
+      for (const auto& range : instance_ranges) {
+        EXPECT_TRUE(range.scan_range.__isset.kudu_scan_token);
+      }
+    }
+  }
+}
+
 } // end namespace impala
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 16c471c..a2bba7b 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -46,6 +46,8 @@
 #include "common/names.h"
 
 using boost::algorithm::join;
+using std::pop_heap;
+using std::push_heap;
 using namespace apache::thrift;
 using namespace org::apache::impala::fb;
 using namespace strings;
@@ -334,6 +336,34 @@
   }
 }
 
+/// Returns a numeric weight that is proportional to the estimated processing time for
+/// the scan range represented by 'params'. Weights from different scan node
+/// implementations, e.g. FS vs Kudu, are not comparable.
+static int64_t ScanRangeWeight(const TScanRangeParams& params) {
+  if (params.scan_range.__isset.hdfs_file_split) {
+    return params.scan_range.hdfs_file_split.length;
+  } else {
+    // Give equal weight to each Kudu and Hbase split.
+    // TODO: implement more accurate logic for Kudu and Hbase
+    return 1;
+  }
+}
+
+/// Helper class used in CreateScanInstances() to track the amount of work assigned
+/// to each instance so far.
+struct InstanceAssignment {
+  // The weight assigned so far.
+  int64_t weight;
+
+  // The index of the instance in 'per_instance_ranges'
+  int instance_idx;
+
+  // Comparator for use in a heap as part of the longest processing time algo.
+  // Invert the comparison order because the *_heap functions implement a max-heap
+  // and we want to assign to the least-loaded instance first.
+  bool operator<(InstanceAssignment& other) const { return weight > other.weight; }
+};
+
 void Scheduler::CreateScanInstances(const ExecutorConfig& executor_config,
     PlanNodeId leftmost_scan_id, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
@@ -353,7 +383,7 @@
   }
 
   int per_fragment_instance_idx = 0;
-  for (const auto& assignment_entry : fragment_params->scan_range_assignment) {
+  for (auto& assignment_entry : fragment_params->scan_range_assignment) {
     // evenly divide up the scan ranges of the leftmost scan between at most
     // <dop> instances
     const TNetworkAddress& host = assignment_entry.first;
@@ -364,65 +394,59 @@
     DCHECK(IsResolvedAddress(krpc_host));
     auto scan_ranges_it = assignment_entry.second.find(leftmost_scan_id);
     DCHECK(scan_ranges_it != assignment_entry.second.end());
-    const vector<TScanRangeParams>& params_list = scan_ranges_it->second;
 
-    int64 total_size = 0;
-    for (const TScanRangeParams& params : params_list) {
-      if (params.scan_range.__isset.hdfs_file_split) {
-        total_size += params.scan_range.hdfs_file_split.length;
-      } else {
-        // fake load-balancing for Kudu and Hbase: every split has length 1
-        // TODO: implement more accurate logic for Kudu and Hbase
-        ++total_size;
-      }
-    }
-
-    // try to load-balance scan ranges by assigning just beyond the average number of
-    // bytes to each instance
-    // TODO: fix shortcomings introduced by uneven split sizes,
-    // this could end up assigning 0 scan ranges to an instance
-    int num_instances = ::min(max_num_instances, static_cast<int>(params_list.size()));
-    DCHECK_GT(num_instances, 0);
-    float avg_bytes_per_instance = static_cast<float>(total_size) / num_instances;
-    int64_t total_assigned_bytes = 0;
-    int params_idx = 0; // into params_list
-    for (int i = 0; i < num_instances; ++i) {
+    // We reorder the scan ranges vector in-place to avoid creating another copy of it.
+    // This should be safe since the code is single-threaded and other code does not
+    // depend on the order of the vector.
+    vector<TScanRangeParams>* params_list = &scan_ranges_it->second;
+    int num_instances = ::min(max_num_instances, static_cast<int>(params_list->size()));
+    vector<vector<TScanRangeParams>> per_instance_ranges =
+        AssignRangesToInstances(num_instances, params_list);
+    for (vector<TScanRangeParams>& instance_ranges : per_instance_ranges) {
       fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
           host, krpc_host, per_fragment_instance_idx++, *fragment_params);
       FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
-
-      // Threshold beyond which we want to assign to the next instance.
-      int64_t threshold_total_bytes = avg_bytes_per_instance * (i + 1);
-
-      // Assign each scan range in params_list. When the per-instance threshold is
-      // reached, move on to the next instance.
-      while (params_idx < params_list.size()) {
-        const TScanRangeParams& scan_range_params = params_list[params_idx];
-        instance_params.per_node_scan_ranges[leftmost_scan_id].push_back(
-            scan_range_params);
-        if (scan_range_params.scan_range.__isset.hdfs_file_split) {
-          total_assigned_bytes += scan_range_params.scan_range.hdfs_file_split.length;
-        } else {
-          // for Kudu and Hbase every split has length 1
-          ++total_assigned_bytes;
-        }
-        ++params_idx;
-        // If this assignment pushes this instance past the threshold, move on to the next
-        // instance. However, if this is the last instance, assign any remaining scan
-        // ranges here since there are no further instances to load-balance across. There
-        // may be leftover scan ranges because threshold_total_bytes only approximates the
-        // per-node byte threshold.
-        if (total_assigned_bytes >= threshold_total_bytes && i != num_instances - 1) {
-          break;
-        }
-      }
-      if (params_idx == params_list.size()) break; // nothing left to assign
+      instance_params.per_node_scan_ranges[leftmost_scan_id] = move(instance_ranges);
     }
-    DCHECK_EQ(params_idx, params_list.size()); // everything got assigned
-    DCHECK_EQ(total_assigned_bytes, total_size);
   }
 }
 
+vector<vector<TScanRangeParams>> Scheduler::AssignRangesToInstances(
+    int num_instances, vector<TScanRangeParams>* ranges) {
+  // We need to assign scan ranges to instances. We would like the assignment to be
+  // as even as possible, so that each instance does about the same amount of work.
+  // Use longest-processing time (LPT) algorithm, which is a good approximation of the
+  // optimal solution (there is a theoretic bound of ~4/3 of the optimal solution
+  // in the worst case). It also guarantees that at least one scan range is assigned
+  // to each instance.
+  DCHECK_GT(num_instances, 0);
+  vector<vector<TScanRangeParams>> per_instance_ranges(num_instances);
+  if (num_instances < 2) {
+    // Short-circuit the assignment algorithm for the single instance case.
+    per_instance_ranges[0] = *ranges;
+  } else {
+    // The LPT algorithm is straightforward:
+    // 1. sort the scan ranges to be assigned by descending weight.
+    // 2. assign each item to the instance with the least weight assigned so far.
+    vector<InstanceAssignment> instance_heap;
+    instance_heap.reserve(num_instances);
+    for (int i = 0; i < num_instances; ++i) {
+      instance_heap.emplace_back(InstanceAssignment{0, i});
+    }
+    std::sort(ranges->begin(), ranges->end(),
+        [](const TScanRangeParams& a, const TScanRangeParams& b) {
+          return ScanRangeWeight(a) > ScanRangeWeight(b);
+        });
+    for (TScanRangeParams& range : *ranges) {
+      per_instance_ranges[instance_heap[0].instance_idx].push_back(range);
+      instance_heap[0].weight += ScanRangeWeight(range);
+      pop_heap(instance_heap.begin(), instance_heap.end());
+      push_heap(instance_heap.begin(), instance_heap.end());
+    }
+  }
+  return per_instance_ranges;
+}
+
 void Scheduler::CreateCollocatedInstances(
     FragmentExecParams* fragment_params, QuerySchedule* schedule) {
   DCHECK_GE(fragment_params->input_fragments.size(), 1);
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 34ac10d..72fbe11 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -386,6 +386,13 @@
   void CreateScanInstances(const ExecutorConfig& executor_config, PlanNodeId scan_id,
       FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
+  /// Compute an assignment of scan ranges 'ranges' that were assigned to a host to
+  /// 'num_instances' fragment instances running on the same host. Attempts to minimize
+  /// skew across the instances. 'num_ranges' must be positive. May reorder ranges in
+  /// 'ranges'.
+  static std::vector<std::vector<TScanRangeParams>> AssignRangesToInstances(
+      int num_instances, std::vector<TScanRangeParams>* ranges);
+
   /// For each instance of fragment_params's input fragment, create a collocated
   /// instance for fragment_params's fragment.
   /// Expects that fragment_params only has a single input fragment.
@@ -415,6 +422,7 @@
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomDiskLocal);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomRemote);
+  FRIEND_TEST(SchedulerTest, TestMultipleFinstances);
 };
 
 }