IMPALA-9125: generalize finding DataSink from other fragment
This is a minor refactor that makes the existing mechanism
for finding PlanRootSink more general:
* Status-returning API to reduce boilerplate in caller.
* Avoid storing PlanRootSink as a member - this doesn't scale
nicely to multiple subclasses. Instead use an accessor method.
Change-Id: I3c91cadf9ecfde9cc3235bd71412ba10ee37bc4e
Reviewed-on: http://gerrit.cloudera.org:8080/14710
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 6b5ba9a..c98203a 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -146,18 +146,13 @@
// set coord_instance_ and coord_sink_
if (schedule_.GetCoordFragment() != nullptr) {
// this blocks until all fragment instances have finished their Prepare phase
- coord_instance_ = query_state_->GetFInstanceState(query_id());
- if (coord_instance_ == nullptr) {
- // at this point, the query is done with the Prepare phase, and we expect
- // to have a coordinator instance, but coord_instance_ == nullptr,
- // which means we failed before or during Prepare().
- Status query_status = query_state_->WaitForPrepare();
- DCHECK(!query_status.ok());
- return UpdateExecState(query_status, nullptr, FLAGS_hostname);
- }
+ Status query_status = query_state_->GetFInstanceState(query_id(), &coord_instance_);
+ if (!query_status.ok()) return UpdateExecState(query_status, nullptr, FLAGS_hostname);
+ // We expected this query to have a coordinator instance.
+ DCHECK(coord_instance_ != nullptr);
// When GetFInstanceState() returns the coordinator instance, the Prepare phase is
// done and the FragmentInstanceState's root sink will be set up.
- coord_sink_ = coord_instance_->root_sink();
+ coord_sink_ = coord_instance_->GetRootSink();
DCHECK(coord_sink_ != nullptr);
}
return Status::OK();
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index ec4cb25..320e477 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -125,7 +125,8 @@
void FragmentInstanceState::Cancel() {
DCHECK(runtime_state_ != nullptr);
runtime_state_->Cancel();
- if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
+ PlanRootSink* root_sink = GetRootSink();
+ if (root_sink != nullptr) root_sink->Cancel(runtime_state_);
ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
}
@@ -219,8 +220,8 @@
RuntimeProfile* sink_profile = sink_->profile();
if (sink_profile != nullptr) profile()->AddChild(sink_profile);
- if (fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
- root_sink_ = reinterpret_cast<PlanRootSink*>(sink_);
+ PlanRootSink* root_sink = GetRootSink();
+ if (root_sink != nullptr) {
// Release the thread token on the root fragment instance. This fragment spends most
// of the time waiting and doing very little work. Holding on to the token causes
// underutilization of the machine. If there are 12 queries on this node, that's 12
@@ -544,6 +545,12 @@
return finstance_state_labels[state];
}
+PlanRootSink* FragmentInstanceState::GetRootSink() const {
+ return fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK ?
+ static_cast<PlanRootSink*>(sink_) :
+ nullptr;
+}
+
const TQueryCtx& FragmentInstanceState::query_ctx() const {
return query_state_->query_ctx();
}
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 33cf656..bedf31f 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -112,7 +112,7 @@
/// Returns fragment instance's sink if this is the root fragment instance. Valid after
/// the Prepare phase. May be nullptr.
- PlanRootSink* root_sink() { return root_sink_; }
+ PlanRootSink* GetRootSink() const;
/// Returns a string description of 'state'.
static const string& ExecStateToString(FInstanceExecStatePB state);
@@ -219,10 +219,6 @@
/// obj_pool().
DataSink* sink_ = nullptr;
- /// Set if this fragment instance is the root of the entire plan, so that a consumer can
- /// pull results by calling root_sink_->GetNext(). Same object as sink_.
- PlanRootSink* root_sink_ = nullptr;
-
/// should live in obj_pool(), but managed separately so we can delete it in Close()
boost::scoped_ptr<RowBatch> row_batch_;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 77e6b10..5b60be0 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -290,11 +290,13 @@
}
}
-FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
+Status QueryState::GetFInstanceState(
+ const TUniqueId& instance_id, FragmentInstanceState** fi_state) {
VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
- if (!WaitForPrepare().ok()) return nullptr;
+ RETURN_IF_ERROR(WaitForPrepare());
auto it = fis_map_.find(instance_id);
- return it != fis_map_.end() ? it->second : nullptr;
+ *fi_state = it != fis_map_.end() ? it->second : nullptr;
+ return Status::OK();
}
void QueryState::ConstructReport(bool instances_started,
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index bbb0fb7..39101e1 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -196,7 +196,11 @@
void MonitorFInstances();
/// Blocks until all fragment instances have finished their Prepare phase.
- FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
+ /// Returns the fragment instance state for 'instance_id' in *fi_state,
+ /// or nullptr if it is not present.
+ /// Returns an error if fragment preparation failed.
+ Status GetFInstanceState(
+ const TUniqueId& instance_id, FragmentInstanceState** fi_state);
/// Blocks until all fragment instances have finished their Prepare phase.
void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);