IMPALA-4224: part 1: schedule join builds

This adds the scheduler logic for fragments with join builds at their
root. These fragments need to be co-located with the fragment with the
join node.

The new code is not active yet because the planner does not generate
plans with join builds (except for planner tests). This change
was validated in the context of a larger patch that enables the join
build plans via the planner and makes query execution work.

Change-Id: I779463cfa2ea9b372607d2be6d5d2252a6469e34
Reviewed-on: http://gerrit.cloudera.org:8080/14944
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 857a182..25fe3b2 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -124,7 +124,7 @@
       PlanNodeId dest_node_id = fragment.output_sink.stream_sink.dest_node_id;
       FragmentIdx dest_idx = plan_node_to_fragment_idx_[dest_node_id];
       FragmentExecParams& dest_params = fragment_exec_params_[dest_idx];
-      dest_params.input_fragments.push_back(fragment.idx);
+      dest_params.exchange_input_fragments.push_back(fragment.idx);
     }
   }
 }
@@ -181,6 +181,11 @@
     }
   }
 
+  // Check that all fragments have instances.
+  for (const FragmentExecParams& fp: fragment_exec_params_) {
+    DCHECK_GT(fp.instance_exec_params.size(), 0) << fp.fragment;
+  }
+
   for (const auto& elem: per_backend_exec_params_) {
     const BackendExecParams& bp = elem.second;
     DCHECK(!bp.instance_params.empty() || bp.is_coord_backend);
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index d019eb2..a57ebb7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -106,6 +106,9 @@
   /// uniquely identify it to a receiver. -1 = invalid.
   int sender_id;
 
+  // List of input join build finstances for joins in this finstance.
+  std::vector<TJoinBuildInput> join_build_inputs;
+
   /// The parent FragmentExecParams
   const FragmentExecParams& fragment_exec_params;
   const TPlanFragment& fragment() const;
@@ -136,7 +139,9 @@
 
   bool is_coord_fragment;
   const TPlanFragment& fragment;
-  std::vector<FragmentIdx> input_fragments;
+
+  // Fragments that are inputs to an ExchangeNode of this fragment.
+  std::vector<FragmentIdx> exchange_input_fragments;
   std::vector<FInstanceExecParams> instance_exec_params;
 
   FragmentExecParams(const TPlanFragment& fragment)
@@ -372,7 +377,7 @@
   string executor_group_;
 
   /// Populate fragment_exec_params_ from request_.plan_exec_info.
-  /// Sets is_coord_fragment and input_fragments.
+  /// Sets is_coord_fragment and exchange_input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 626afde..b458fa6 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -202,7 +202,10 @@
     const ExecutorConfig& executor_config, QuerySchedule* schedule) {
   const TQueryExecRequest& exec_request = schedule->request();
 
-  // for each plan, compute the FInstanceExecParams for the tree of fragments
+  // for each plan, compute the FInstanceExecParams for the tree of fragments.
+  // The plans are in dependency order, so we compute parameters for each plan
+  // *before* its input join build plans. This allows the join build plans to
+  // be easily co-located with the plans consuming their output.
   for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
     // set instance_id, host, per_node_scan_ranges
     ComputeFragmentExecParams(executor_config, plan_exec_info,
@@ -210,13 +213,11 @@
 
     // Set destinations, per_exch_num_senders, sender_id.
     for (const TPlanFragment& src_fragment : plan_exec_info.fragments) {
+      VLOG(3) << "Computing exec params for fragment " << src_fragment.display_name;
       if (!src_fragment.output_sink.__isset.stream_sink) continue;
       FragmentIdx dest_idx =
           schedule->GetFragmentIdx(src_fragment.output_sink.stream_sink.dest_node_id);
-      DCHECK_LT(dest_idx, plan_exec_info.fragments.size());
-      const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx];
-      FragmentExecParams* dest_params =
-          schedule->GetFragmentExecParams(dest_fragment.idx);
+      FragmentExecParams* dest_params = schedule->GetFragmentExecParams(dest_idx);
       FragmentExecParams* src_params = schedule->GetFragmentExecParams(src_fragment.idx);
 
       // populate src_params->destinations
@@ -254,15 +255,24 @@
 void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
     const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
-  // traverse input fragments
-  for (FragmentIdx input_fragment_idx : fragment_params->input_fragments) {
+  // Create exec params for child fragments connected by an exchange. Instance creation
+  // for this fragment depends on where the input fragment instances are scheduled.
+  for (FragmentIdx input_fragment_idx : fragment_params->exchange_input_fragments) {
     ComputeFragmentExecParams(executor_config, plan_exec_info,
         schedule->GetFragmentExecParams(input_fragment_idx), schedule);
   }
 
   const TPlanFragment& fragment = fragment_params->fragment;
-  // case 1: single instance executed at coordinator
-  if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+  if (fragment.output_sink.__isset.join_build_sink) {
+    // case 0: join build fragment, co-located with its parent fragment. Join build
+    // fragments may be unpartitioned if they are co-located with the root fragment.
+    VLOG(3) << "Computing exec params for collocated join build fragment "
+            << fragment_params->fragment.display_name;
+    CreateCollocatedJoinBuildInstances(fragment_params, schedule);
+  } else if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+    // case 1: root fragment instance executed at coordinator
+    VLOG(3) << "Computing exec params for coordinator fragment "
+            << fragment_params->fragment.display_name;
     const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
     const TNetworkAddress& coord = local_be_desc.address;
     DCHECK(local_be_desc.__isset.krpc_address);
@@ -282,20 +292,18 @@
       auto first_entry = fragment_params->scan_range_assignment.begin();
       instance_params.per_node_scan_ranges = first_entry->second;
     }
-
-    return;
-  }
-
-  if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)
+  } else if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)
       || ContainsScanNode(fragment.plan)) {
+    VLOG(3) << "Computing exec params for scan and/or union fragment.";
     // case 2: leaf fragment (i.e. no input fragments) with a single scan node.
     // case 3: union fragment, which may have scan nodes and may have input fragments.
     CreateCollocatedAndScanInstances(executor_config, fragment_params, schedule);
   } else {
+    VLOG(3) << "Computing exec params for interior fragment.";
     // case 4: interior (non-leaf) fragment without a scan or union.
     // We assign the same hosts as those of our leftmost input fragment (so that a
     // merge aggregation fragment runs on the hosts that provide the input data).
-    CreateCollocatedInstances(fragment_params, schedule);
+    CreateInputCollocatedInstances(fragment_params, schedule);
   }
 }
 
@@ -367,7 +375,7 @@
   // the input scan, for consistency with the previous behaviour of only using
   // the parallelism of the scan.
   if (has_union) {
-    for (FragmentIdx idx : fragment_params->input_fragments) {
+    for (FragmentIdx idx : fragment_params->exchange_input_fragments) {
       std::unordered_map<TNetworkAddress, int> input_fragment_instances_per_host;
       const FragmentExecParams& input_params = *schedule->GetFragmentExecParams(idx);
       for (const FInstanceExecParams& instance_params :
@@ -503,20 +511,47 @@
   return per_instance_ranges;
 }
 
-void Scheduler::CreateCollocatedInstances(
+void Scheduler::CreateInputCollocatedInstances(
     FragmentExecParams* fragment_params, QuerySchedule* schedule) {
-  DCHECK_GE(fragment_params->input_fragments.size(), 1);
-  const FragmentExecParams* input_fragment_params =
-      schedule->GetFragmentExecParams(fragment_params->input_fragments[0]);
+  DCHECK_GE(fragment_params->exchange_input_fragments.size(), 1);
+  const FragmentExecParams& input_fragment_params =
+      *schedule->GetFragmentExecParams(fragment_params->exchange_input_fragments[0]);
   int per_fragment_instance_idx = 0;
   for (const FInstanceExecParams& input_instance_params :
-      input_fragment_params->instance_exec_params) {
+      input_fragment_params.instance_exec_params) {
     fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
         input_instance_params.host, input_instance_params.krpc_host,
         per_fragment_instance_idx++, *fragment_params);
   }
 }
 
+void Scheduler::CreateCollocatedJoinBuildInstances(
+    FragmentExecParams* fragment_params, QuerySchedule* schedule) {
+  const TPlanFragment& fragment = fragment_params->fragment;
+  DCHECK(fragment.output_sink.__isset.join_build_sink);
+  const TJoinBuildSink& sink = fragment.output_sink.join_build_sink;
+  int join_fragment_idx = schedule->GetFragmentIdx(sink.dest_node_id);
+  FragmentExecParams* join_fragment_params =
+      schedule->GetFragmentExecParams(join_fragment_idx);
+  DCHECK(!join_fragment_params->instance_exec_params.empty())
+      << "Parent fragment instances must already be created.";
+  int per_fragment_instance_idx = 0;
+  for (FInstanceExecParams& parent_exec_params :
+      join_fragment_params->instance_exec_params) {
+    TUniqueId instance_id = schedule->GetNextInstanceId();
+    fragment_params->instance_exec_params.emplace_back(instance_id,
+        parent_exec_params.host, parent_exec_params.krpc_host,
+        per_fragment_instance_idx++, *fragment_params);
+    TJoinBuildInput build_input;
+    build_input.__set_join_node_id(sink.dest_node_id);
+    build_input.__set_input_finstance_id(instance_id);
+    parent_exec_params.join_build_inputs.emplace_back(build_input);
+    VLOG(3) << "Linked join build for node id=" << sink.dest_node_id
+            << " build finstance=" << PrintId(instance_id)
+            << " dst finstance=" << PrintId(parent_exec_params.instance_id);
+  }
+}
+
 Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
     PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
     bool node_random_replica, const vector<TScanRangeLocationList>& locations,
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index a2da46f..f5f5c2e 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -395,7 +395,14 @@
   /// For each instance of fragment_params's input fragment, create a collocated
   /// instance for fragment_params's fragment.
   /// Expects that fragment_params only has a single input fragment.
-  void CreateCollocatedInstances(
+  void CreateInputCollocatedInstances(
+      FragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// Create instances for a fragment that has a join build sink as its root.
+  /// These instances will be collocated with the fragment instances that consume
+  /// the join build. Therefore, those instances must have already been created
+  /// by the scheduler.
+  void CreateCollocatedJoinBuildInstances(
       FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
   /// Add all hosts that the scans identified by 'scan_ids' are executed on to
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 19dd86b..fdf8f6c 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -100,7 +100,8 @@
 
 // Sink to create the build side of a JoinNode.
 struct TJoinBuildSink {
-  1: required Types.TJoinTableId join_table_id
+  // destination join node id
+  1: required Types.TPlanNodeId dest_node_id
 
   // only set for hash join build sinks
   2: required list<Exprs.TExpr> build_exprs
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index e6e1fd8..716818b 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -406,7 +406,8 @@
 
 // Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
 struct TQueryExecRequest {
-  // exec info for all plans; the first one materializes the query result
+  // exec info for all plans; the first one materializes the query result and subsequent
+  // ones materialize join builds that are input for preceding plans in the list.
   1: optional list<TPlanExecInfo> plan_exec_info
 
   // Metadata of the query result set (only for select)
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 2285d4b..55eb0cf 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -613,6 +613,15 @@
   2: required i32 filter_id
 }
 
+// Information about the input fragment instance of a join node.
+struct TJoinBuildInput {
+  // The join node id that will consume this join build.
+  1: required Types.TPlanNodeId join_node_id
+
+  // Fragment instance id of the input fragment instance.
+  2: required Types.TUniqueId input_finstance_id
+}
+
 // Execution parameters of a single fragment instance.
 struct TPlanFragmentInstanceCtx {
   // TPlanFragment.idx
@@ -647,6 +656,9 @@
 
   // List of runtime filters produced by nodes in the finstance.
   8: optional list<TRuntimeFilterSource> filters_produced
+
+  // List of input join build finstances for joins in this finstance.
+  9: optional list<TJoinBuildInput> join_build_inputs
 }
 
 
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index 5610646..63e733f 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -38,6 +38,9 @@
   // id of join's build-side table assigned during planning
   private final JoinTableId joinTableId_;
 
+  // Reference to the join node that consumes the build side.
+  private final JoinNode joinNode_;
+
   private final List<Expr> buildExprs_ = new ArrayList<>();
 
   /**
@@ -46,6 +49,7 @@
   public JoinBuildSink(JoinTableId joinTableId, JoinNode joinNode) {
     Preconditions.checkState(joinTableId.isValid());
     joinTableId_ = joinTableId;
+    joinNode_ = joinNode;
     Preconditions.checkNotNull(joinNode);
     Preconditions.checkState(joinNode instanceof JoinNode);
     if (!(joinNode instanceof HashJoinNode)) return;
@@ -61,7 +65,7 @@
   @Override
   protected void toThriftImpl(TDataSink tsink) {
     TJoinBuildSink tBuildSink = new TJoinBuildSink();
-    tBuildSink.setJoin_table_id(joinTableId_.asInt());
+    tBuildSink.setDest_node_id(joinNode_.getId().asInt());
     for (Expr buildExpr: buildExprs_) {
       tBuildSink.addToBuild_exprs(buildExpr.treeToThrift());
     }