IMPALA-9125: generalize finding DataSink from other fragment

This is a minor refactor that makes the existing mechanism
for finding PlanRootSink more general:
* Status-returning API to reduce boilerplate in caller.
* Avoid storing PlanRootSink as a member - this doesn't scale
  nicely to multiple subclasses. Instead use an accessor method.

Change-Id: I3c91cadf9ecfde9cc3235bd71412ba10ee37bc4e
Reviewed-on: http://gerrit.cloudera.org:8080/14710
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 6b5ba9a..c98203a 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -146,18 +146,13 @@
   // set coord_instance_ and coord_sink_
   if (schedule_.GetCoordFragment() != nullptr) {
     // this blocks until all fragment instances have finished their Prepare phase
-    coord_instance_ = query_state_->GetFInstanceState(query_id());
-    if (coord_instance_ == nullptr) {
-      // at this point, the query is done with the Prepare phase, and we expect
-      // to have a coordinator instance, but coord_instance_ == nullptr,
-      // which means we failed before or during Prepare().
-      Status query_status = query_state_->WaitForPrepare();
-      DCHECK(!query_status.ok());
-      return UpdateExecState(query_status, nullptr, FLAGS_hostname);
-    }
+    Status query_status = query_state_->GetFInstanceState(query_id(), &coord_instance_);
+    if (!query_status.ok()) return UpdateExecState(query_status, nullptr, FLAGS_hostname);
+    // We expected this query to have a coordinator instance.
+    DCHECK(coord_instance_ != nullptr);
     // When GetFInstanceState() returns the coordinator instance, the Prepare phase is
     // done and the FragmentInstanceState's root sink will be set up.
-    coord_sink_ = coord_instance_->root_sink();
+    coord_sink_ = coord_instance_->GetRootSink();
     DCHECK(coord_sink_ != nullptr);
   }
   return Status::OK();
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index ec4cb25..320e477 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -125,7 +125,8 @@
 void FragmentInstanceState::Cancel() {
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->Cancel();
-  if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
+  PlanRootSink* root_sink = GetRootSink();
+  if (root_sink != nullptr) root_sink->Cancel(runtime_state_);
   ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
 
@@ -219,8 +220,8 @@
   RuntimeProfile* sink_profile = sink_->profile();
   if (sink_profile != nullptr) profile()->AddChild(sink_profile);
 
-  if (fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
-    root_sink_ = reinterpret_cast<PlanRootSink*>(sink_);
+  PlanRootSink* root_sink = GetRootSink();
+  if (root_sink != nullptr) {
     // Release the thread token on the root fragment instance. This fragment spends most
     // of the time waiting and doing very little work. Holding on to the token causes
     // underutilization of the machine. If there are 12 queries on this node, that's 12
@@ -544,6 +545,12 @@
   return finstance_state_labels[state];
 }
 
+PlanRootSink* FragmentInstanceState::GetRootSink() const {
+  return fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK ?
+      static_cast<PlanRootSink*>(sink_) :
+      nullptr;
+}
+
 const TQueryCtx& FragmentInstanceState::query_ctx() const {
   return query_state_->query_ctx();
 }
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 33cf656..bedf31f 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -112,7 +112,7 @@
 
   /// Returns fragment instance's sink if this is the root fragment instance. Valid after
   /// the Prepare phase. May be nullptr.
-  PlanRootSink* root_sink() { return root_sink_; }
+  PlanRootSink* GetRootSink() const;
 
   /// Returns a string description of 'state'.
   static const string& ExecStateToString(FInstanceExecStatePB state);
@@ -219,10 +219,6 @@
   /// obj_pool().
   DataSink* sink_ = nullptr;
 
-  /// Set if this fragment instance is the root of the entire plan, so that a consumer can
-  /// pull results by calling root_sink_->GetNext(). Same object as sink_.
-  PlanRootSink* root_sink_ = nullptr;
-
   /// should live in obj_pool(), but managed separately so we can delete it in Close()
   boost::scoped_ptr<RowBatch> row_batch_;
 
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 77e6b10..5b60be0 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -290,11 +290,13 @@
   }
 }
 
-FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
+Status QueryState::GetFInstanceState(
+    const TUniqueId& instance_id, FragmentInstanceState** fi_state) {
   VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
-  if (!WaitForPrepare().ok()) return nullptr;
+  RETURN_IF_ERROR(WaitForPrepare());
   auto it = fis_map_.find(instance_id);
-  return it != fis_map_.end() ? it->second : nullptr;
+  *fi_state = it != fis_map_.end() ? it->second : nullptr;
+  return Status::OK();
 }
 
 void QueryState::ConstructReport(bool instances_started,
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index bbb0fb7..39101e1 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -196,7 +196,11 @@
   void MonitorFInstances();
 
   /// Blocks until all fragment instances have finished their Prepare phase.
-  FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
+  /// Returns the fragment instance state for 'instance_id' in *fi_state,
+  /// or nullptr if it is not present.
+  /// Returns an error if fragment preparation failed.
+  Status GetFInstanceState(
+      const TUniqueId& instance_id, FragmentInstanceState** fi_state);
 
   /// Blocks until all fragment instances have finished their Prepare phase.
   void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);