// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.


#include "runtime/fragment-instance-state.h"

#include <sstream>
#include <thrift/protocol/TDebugProtocol.h>
#include <boost/thread/locks.hpp>
#include <boost/thread/thread_time.hpp>
#include <boost/thread/lock_guard.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>

#include "common/names.h"
#include "codegen/llvm-codegen.h"
#include "exec/plan-root-sink.h"
#include "exec/exec-node.h"
#include "exec/hdfs-scan-node-base.h"
#include "exec/exchange-node.h"
#include "exec/scan-node.h"
#include "kudu/rpc/rpc_context.h"
#include "runtime/exec-env.h"
#include "runtime/backend-client.h"
#include "runtime/client-cache.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/query-state.h"
#include "runtime/query-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-filter-bank.h"
#include "runtime/runtime-state.h"
#include "runtime/thread-resource-mgr.h"
#include "scheduling/query-schedule.h"
#include "util/debug-util.h"
#include "util/container-util.h"
#include "util/periodic-counter-updater.h"
#include "gen-cpp/ImpalaInternalService_types.h"

using kudu::rpc::RpcContext;
using namespace impala;
using namespace apache::thrift;

const string FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage";
const string FragmentInstanceState::FINST_THREAD_GROUP_NAME = "fragment-execution";
const string FragmentInstanceState::FINST_THREAD_NAME_PREFIX = "exec-finstance";

static const string OPEN_TIMER_NAME = "OpenTime";
static const string PREPARE_TIMER_NAME = "PrepareTime";
static const string EXEC_TIMER_NAME = "ExecTime";

FragmentInstanceState::FragmentInstanceState(
    QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
    const TPlanFragmentInstanceCtx& instance_ctx)
  : query_state_(query_state),
    fragment_ctx_(fragment_ctx),
    instance_ctx_(instance_ctx) {
}

Status FragmentInstanceState::Exec() {
  bool is_prepared = false;
  Status status = Prepare();
  DCHECK(runtime_state_ != nullptr);  // we need to guarantee at least that

  if (!status.ok()) {
    discard_result(opened_promise_.Set(status));
    goto done;
  }
  // Tell the managing 'QueryState' that we're done with Prepare().
  query_state_->DonePreparing();
  is_prepared = true;

  status = Open();
  discard_result(opened_promise_.Set(status));
  if (!status.ok()) goto done;

  {
    // Must go out of scope before Finalize(), otherwise counter will not be
    // updated by time final profile is sent.
    SCOPED_TIMER2(profile()->total_time_counter(),
        ADD_TIMER(timings_profile_, EXEC_TIMER_NAME));
    status = ExecInternal();
  }

done:
  // Don't transition to completion until Close() is called as some new errors may be
  // logged in RuntimeState:error_log_.
  Close();

  // Must update the fragment instance state first before updating the 'Query State'.
  // Otherwise, there is a race when reading the 'done' flag with GetStatusReport().
  // This may lead to the "final" profile being sent with the 'done' flag as false.
  DCHECK_EQ(is_prepared,
      current_state_.Load() > FInstanceExecStatePB::WAITING_FOR_PREPARE);
  UpdateState(StateEvent::EXEC_END);

  if (!status.ok()) {
    if (!is_prepared) {
      // Tell the managing 'QueryState' that we hit an error during Prepare().
      query_state_->ErrorDuringPrepare(status, instance_id());
    } else {
      // Tell the managing 'QueryState' that we hit an error during execution.
      query_state_->ErrorDuringExecute(status, instance_id());
    }
  } else {
    // Tell the managing 'QueryState' that we're done with executing.
    query_state_->DoneExecuting();
  }
  return status;
}

void FragmentInstanceState::Cancel() {
  DCHECK(runtime_state_ != nullptr);
  runtime_state_->Cancel();
  PlanRootSink* root_sink = GetRootSink();
  if (root_sink != nullptr) root_sink->Cancel(runtime_state_);
  ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
}

Status FragmentInstanceState::Prepare() {
  DCHECK_EQ(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_EXEC);
  VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx_);

  // Do not call RETURN_IF_ERROR or explicitly return before this line,
  // runtime_state_ != nullptr is a postcondition of this function.
  runtime_state_ = obj_pool()->Add(new RuntimeState(
      query_state_, fragment_ctx_, instance_ctx_, ExecEnv::GetInstance()));

  // total_time_counter() is in the runtime_state_ so start it up now.
  SCOPED_TIMER(profile()->total_time_counter());
  timings_profile_ =
      RuntimeProfile::Create(obj_pool(), "Fragment Instance Lifecycle Timings");
  profile()->AddChild(timings_profile_);
  SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));

  // Events that are tracked in a separate timeline for each fragment instance, relative
  // to the startup of the query state.
  event_sequence_ =
      profile()->AddEventSequence("Fragment Instance Lifecycle Event Timeline");
  event_sequence_->Start(query_state_->fragment_events_start_time());
  UpdateState(StateEvent::PREPARE_START);

  // Reserve one main thread from the pool
  runtime_state_->resource_pool()->AcquireThreadToken();

  // Exercise debug actions at the first point where errors are possible in Prepare().
  RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_PREPARE"));

  RETURN_IF_ERROR(runtime_state_->InitFilterBank(
      fragment_ctx_.fragment.runtime_filters_reservation_bytes));

  avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
      bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
          runtime_state_->resource_pool()));
  mem_usage_sampled_counter_ = profile()->AddSamplingTimeSeriesCounter("MemoryUsage",
      TUnit::BYTES,
      bind<int64_t>(mem_fn(&MemTracker::consumption),
          runtime_state_->instance_mem_tracker()));
  thread_usage_sampled_counter_ = profile()->AddSamplingTimeSeriesCounter("ThreadUsage",
      TUnit::UNIT,
      bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
          runtime_state_->resource_pool()));

  PlanNode* plan_tree = nullptr;
  RETURN_IF_ERROR(
      PlanNode::CreateTree(runtime_state_, fragment_ctx_.fragment.plan, &plan_tree));
  plan_tree_ = plan_tree;
  // set up plan
  RETURN_IF_ERROR(ExecNode::CreateTree(
      runtime_state_, *plan_tree_, query_state_->desc_tbl(), &exec_tree_));
  runtime_state_->set_fragment_root_id(exec_tree_->id());
  if (instance_ctx_.__isset.debug_options) {
    ExecNode::SetDebugOptions(instance_ctx_.debug_options, exec_tree_);
  }

  // set #senders of exchange nodes before calling Prepare()
  vector<ExecNode*> exch_nodes;
  exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
  for (ExecNode* exch_node : exch_nodes) {
    DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
    int num_senders =
        FindWithDefault(instance_ctx_.per_exch_num_senders, exch_node->id(), 0);
    DCHECK_GT(num_senders, 0);
    static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
  }

  // set scan ranges
  vector<ExecNode*> scan_nodes;
  vector<TScanRangeParams> no_scan_ranges;
  exec_tree_->CollectScanNodes(&scan_nodes);
  for (ExecNode* scan_node: scan_nodes) {
    const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
        instance_ctx_.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
    static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges);
  }

  RuntimeProfile::Counter* prepare_timer =
      ADD_CHILD_TIMER(timings_profile_, "ExecTreePrepareTime", PREPARE_TIMER_NAME);
  {
    SCOPED_TIMER(prepare_timer);
    RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_));
  }
  PrintVolumeIds();

  // prepare sink_
  DCHECK(fragment_ctx_.fragment.__isset.output_sink);
  const TDataSink& thrift_sink = fragment_ctx_.fragment.output_sink;
  RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
      thrift_sink, plan_tree_->row_descriptor_, runtime_state_, &sink_config_));
  sink_ = sink_config_->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_);
  RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
  RuntimeProfile* sink_profile = sink_->profile();
  if (sink_profile != nullptr) profile()->AddChild(sink_profile);

  PlanRootSink* root_sink = GetRootSink();
  if (root_sink != nullptr) {
    // Release the thread token on the root fragment instance. This fragment spends most
    // of the time waiting and doing very little work. Holding on to the token causes
    // underutilization of the machine. If there are 12 queries on this node, that's 12
    // tokens reserved for no reason.
    ReleaseThreadToken();
  }

  // set up profile counters
  profile()->AddChild(exec_tree_->runtime_profile());
  rows_produced_counter_ =
      ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
  per_host_mem_usage_ =
      ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);

  profile()->AddDerivedCounter("ExchangeScanRatio", TUnit::DOUBLE_VALUE, [this](){
      int64_t counter_val = 0;
      *reinterpret_cast<double*>(&counter_val) =
          runtime_state_->ComputeExchangeScanRatio();
      return counter_val;
      });

  row_batch_.reset(
      new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),
        runtime_state_->instance_mem_tracker()));
  VLOG(2) << "plan_root=\n" << exec_tree_->DebugString();
  return Status::OK();
}

void FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instance_status,
    TRuntimeProfileTree* thrift_profile) {
  DFAKE_SCOPED_LOCK(report_status_lock_);
  DCHECK(!final_report_sent_);
  // Update the counter for the peak per host mem usage.
  if (per_host_mem_usage_ != nullptr) {
    per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
  }
  if (final_report_generated_) {
    // Since execution was already finished, the contents of this report will be identical
    // to the last report, so don't advance the sequence number.
    instance_status->set_report_seq_no(report_seq_no_);
  } else {
    instance_status->set_report_seq_no(AdvanceReportSeqNo());
  }
  const TUniqueId& finstance_id = instance_id();
  TUniqueIdToUniqueIdPB(finstance_id, instance_status->mutable_fragment_instance_id());
  const bool done = IsDone();
  instance_status->set_done(done);
  instance_status->set_current_state(current_state());
  DCHECK(profile() != nullptr);
  profile()->ToThrift(thrift_profile);
  // Send the DML stats if this is the final report.
  if (done) {
    runtime_state()->dml_exec_state()->ToProto(
        instance_status->mutable_dml_exec_status());
    final_report_generated_ = true;
  }
  if (prev_stateful_reports_.size() > 0) {
    // Send errors from previous reports that failed.
    *instance_status->mutable_stateful_report() =
        {prev_stateful_reports_.begin(), prev_stateful_reports_.end()};
  }
  StatefulStatusPB* stateful_report = nullptr;
  if (runtime_state()->HasErrors()) {
    // Add any new errors.
    stateful_report = instance_status->add_stateful_report();
    stateful_report->set_report_seq_no(report_seq_no_);
    runtime_state()->GetUnreportedErrors(stateful_report->mutable_error_log());
  }
  // If set in the RuntimeState, set the AuxErrorInfoPB field.
  if (runtime_state()->HasAuxErrorInfo()) {
    if (stateful_report == nullptr) {
      stateful_report = instance_status->add_stateful_report();
      stateful_report->set_report_seq_no(report_seq_no_);
    }
    runtime_state()->GetUnreportedAuxErrorInfo(stateful_report->mutable_aux_error_info());
  }
}

void FragmentInstanceState::ReportSuccessful(
    const FragmentInstanceExecStatusPB& instance_exec_status) {
  prev_stateful_reports_.clear();
  if (instance_exec_status.done()) final_report_sent_ = true;
}

void FragmentInstanceState::ReportFailed(
    const FragmentInstanceExecStatusPB& instance_exec_status) {
  int num_reports = instance_exec_status.stateful_report_size();
  if (num_reports > 0 && prev_stateful_reports_.size() != num_reports) {
    // If a stateful report was generated in GetStatusReport(), copy it to
    // 'prev_stateful_reports_'. It will be the last one in the list and will have a seq
    // no that matches the overall report's seq no. There can be at most 1 new stateful
    // report that has been generated since the last call to ReportSuccessful()/Failed().
    DCHECK_EQ(prev_stateful_reports_.size() + 1, num_reports);
    const StatefulStatusPB& stateful_report =
        instance_exec_status.stateful_report()[num_reports - 1];
    DCHECK_EQ(stateful_report.report_seq_no(), instance_exec_status.report_seq_no());
    prev_stateful_reports_.emplace_back(stateful_report);
  }
}

Status FragmentInstanceState::Open() {
  DCHECK(!opened_promise_.IsSet());
  DCHECK_EQ(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_PREPARE);
  SCOPED_TIMER2(profile()->total_time_counter(),
      ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());

  if (runtime_state_->ShouldCodegen()) {
    UpdateState(StateEvent::CODEGEN_START);
    RETURN_IF_ERROR(runtime_state_->CreateCodegen());
    {
      SCOPED_TIMER2(runtime_state_->codegen()->ir_generation_timer(),
          runtime_state_->codegen()->runtime_profile()->total_time_counter());
      SCOPED_THREAD_COUNTER_MEASUREMENT(
          runtime_state_->codegen()->llvm_thread_counters());
      exec_tree_->Codegen(runtime_state_);
      sink_->Codegen(runtime_state_->codegen());

      // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
      // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
      // the error status for now.
      RETURN_IF_ERROR(runtime_state_->CodegenScalarExprs());
    }

    LlvmCodeGen* codegen = runtime_state_->codegen();
    DCHECK(codegen != nullptr);
    RETURN_IF_ERROR(codegen->FinalizeModule());
  }

  {
    UpdateState(StateEvent::OPEN_START);
    // Inject failure if debug actions are enabled.
    RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_OPEN"));

    SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME));
    RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
  }
  return sink_->Open(runtime_state_);
}

Status FragmentInstanceState::ExecInternal() {
  DCHECK_EQ(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_OPEN);
  // Inject failure if debug actions are enabled.
  RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_EXEC_INTERNAL"));

  RuntimeProfile::Counter* plan_exec_timer =
      ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
  bool exec_tree_complete = false;
  UpdateState(StateEvent::WAITING_FOR_FIRST_BATCH);
  do {
    Status status;
    row_batch_->Reset();
    {
      SCOPED_TIMER(plan_exec_timer);
      RETURN_IF_ERROR(
          exec_tree_->GetNext(runtime_state_, row_batch_.get(), &exec_tree_complete));
    }
    UpdateState(StateEvent::BATCH_PRODUCED);
    if (VLOG_ROW_IS_ON) row_batch_->VLogRows("FragmentInstanceState::ExecInternal()");
    COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
    RETURN_IF_ERROR(sink_->Send(runtime_state_, row_batch_.get()));
    UpdateState(StateEvent::BATCH_SENT);
  } while (!exec_tree_complete);

  UpdateState(StateEvent::LAST_BATCH_SENT);
  // Flush the sink as a final step.
  RETURN_IF_ERROR(sink_->FlushFinal(runtime_state()));
  return Status::OK();
}

void FragmentInstanceState::Close() {
  DCHECK(runtime_state_ != nullptr);

  // Required to wake up any threads that might be blocked waiting for filters, e.g.
  // scanner threads.
  // TODO: we might be able to remove this with mt_dop, since we only need to worry
  // have the fragment thread (i.e. the current thread).
  Cancel();

  // If we haven't already released this thread token in Prepare(), release
  // it before calling Close().
  if (fragment_ctx_.fragment.output_sink.type != TDataSinkType::PLAN_ROOT_SINK) {
    ReleaseThreadToken();
  }

  // guard against partially-finished Prepare()
  if (sink_ != nullptr) sink_->Close(runtime_state_);

  // Stop updating profile counters in background.
  profile()->StopPeriodicCounters();

  // We need to delete row_batch_ here otherwise we can't delete the instance_mem_tracker_
  // in runtime_state_->ReleaseResources().
  // TODO: do not delete mem trackers in Close()/ReleaseResources(), they are part of
  // the control structures we need to preserve until the underlying QueryState
  // disappears.
  row_batch_.reset();
  if (exec_tree_ != nullptr) exec_tree_->Close(runtime_state_);
  runtime_state_->ReleaseResources();

  // Sanity timer checks
#ifndef NDEBUG
  if (profile() != nullptr && timings_profile_ != nullptr) {
    int64_t total_time = profile()->total_time_counter()->value();
    int64_t other_time = 0;
    for (auto& name: {PREPARE_TIMER_NAME, OPEN_TIMER_NAME, EXEC_TIMER_NAME}) {
      RuntimeProfile::Counter* counter = timings_profile_->GetCounter(name);
      if (counter != nullptr) other_time += counter->value();
    }
    // TODO: IMPALA-4631: Occasionally we see other_time = total_time + ε where ε is 1,
    // 2, or 3. It appears to be a bug with clocks on some virtualized systems. Add 3
    // to total_time to avoid DCHECKing in that case.
    DCHECK_LE(other_time, total_time + 3);
  }
#endif
}

void FragmentInstanceState::UpdateState(const StateEvent event)
{
  FInstanceExecStatePB current_state = current_state_.Load();
  FInstanceExecStatePB next_state = current_state;
  switch (event) {
    case StateEvent::PREPARE_START:
      DCHECK_EQ(current_state, FInstanceExecStatePB::WAITING_FOR_EXEC);
      next_state = FInstanceExecStatePB::WAITING_FOR_PREPARE;
      break;

    case StateEvent::CODEGEN_START:
      DCHECK_EQ(current_state, FInstanceExecStatePB::WAITING_FOR_PREPARE);
      event_sequence_->MarkEvent("Prepare Finished");
      next_state = FInstanceExecStatePB::WAITING_FOR_CODEGEN;
      break;

    case StateEvent::OPEN_START:
      if (current_state == FInstanceExecStatePB::WAITING_FOR_PREPARE) {
        event_sequence_->MarkEvent("Prepare Finished");
      } else {
        DCHECK_EQ(current_state, FInstanceExecStatePB::WAITING_FOR_CODEGEN);
      }
      next_state = FInstanceExecStatePB::WAITING_FOR_OPEN;
      break;

    case StateEvent::WAITING_FOR_FIRST_BATCH:
      DCHECK_EQ(current_state, FInstanceExecStatePB::WAITING_FOR_OPEN);
      event_sequence_->MarkEvent("Open Finished");
      next_state = FInstanceExecStatePB::WAITING_FOR_FIRST_BATCH;
      break;

    case StateEvent::BATCH_PRODUCED:
      if (UNLIKELY(current_state == FInstanceExecStatePB::WAITING_FOR_FIRST_BATCH)) {
        event_sequence_->MarkEvent("First Batch Produced");
        next_state = FInstanceExecStatePB::FIRST_BATCH_PRODUCED;
      } else {
        DCHECK_EQ(current_state, FInstanceExecStatePB::PRODUCING_DATA);
      }
      break;

    case StateEvent::BATCH_SENT:
      if (UNLIKELY(current_state == FInstanceExecStatePB::FIRST_BATCH_PRODUCED)) {
        event_sequence_->MarkEvent("First Batch Sent");
        next_state = FInstanceExecStatePB::PRODUCING_DATA;
      } else {
        DCHECK_EQ(current_state, FInstanceExecStatePB::PRODUCING_DATA);
      }
      break;

    case StateEvent::LAST_BATCH_SENT:
      DCHECK_EQ(current_state, FInstanceExecStatePB::PRODUCING_DATA);
      next_state = FInstanceExecStatePB::LAST_BATCH_SENT;
      break;

    case StateEvent::EXEC_END:
      // Allow abort in all states to make error handling easier.
      event_sequence_->MarkEvent("ExecInternal Finished");
      next_state = FInstanceExecStatePB::FINISHED;
      break;

    default:
      DCHECK(false) << "Unexpected Event: " << static_cast<int>(event);
      break;
  }
  // This method is the only one updating 'current_state_' and is not meant to be thread
  // safe.
  if (next_state != current_state) current_state_.Store(next_state);
}

void FragmentInstanceState::ReleaseThreadToken() {
  DCHECK(runtime_state_ != nullptr);
  DCHECK(runtime_state_->resource_pool() != nullptr);
  runtime_state_->resource_pool()->ReleaseThreadToken(true);
  if (avg_thread_tokens_ != nullptr) {
    PeriodicCounterUpdater::StopSamplingCounter(avg_thread_tokens_);
  }
  if (thread_usage_sampled_counter_ != nullptr) {
    PeriodicCounterUpdater::StopTimeSeriesCounter(thread_usage_sampled_counter_);
  }
}

Status FragmentInstanceState::WaitForOpen() {
  return opened_promise_.Get();
}

void FragmentInstanceState::PublishFilter(
    const PublishFilterParamsPB& params, RpcContext* context) {
  VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
            << " filter_id=" << params.filter_id();
  runtime_state_->filter_bank()->PublishGlobalFilter(params, context);
}

const string& FragmentInstanceState::ExecStateToString(FInstanceExecStatePB state) {
  // Labels to send to the debug webpages to display the current state to the user.
  static const string finstance_state_labels[] = {
      "Waiting for Exec",         // WAITING_FOR_EXEC
      "Waiting for Prepare",      // WAITING_FOR_PREPARE
      "Waiting for Codegen",      // WAITING_FOR_CODEGEN
      "Waiting for First Batch",  // WAITING_FOR_OPEN
      "Waiting for First Batch",  // WAITING_FOR_FIRST_BATCH
      "First batch produced",     // FIRST_BATCH_PRODUCED
      "Producing Data",           // PRODUCING_DATA
      "Last batch sent",          // LAST_BATCH_SENT
      "Finished"                  // FINISHED
  };
  /// Make sure we have a label for every possible state.
  static_assert(sizeof(finstance_state_labels) / sizeof(string) ==
      FInstanceExecStatePB::FINISHED + 1, "");

  DCHECK_LT(state, sizeof(finstance_state_labels) / sizeof(string))
      << "Unknown instance state";
  return finstance_state_labels[state];
}

PlanRootSink* FragmentInstanceState::GetRootSink() const {
  return fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK ?
      static_cast<PlanRootSink*>(sink_) :
      nullptr;
}

const TQueryCtx& FragmentInstanceState::query_ctx() const {
  return query_state_->query_ctx();
}

ObjectPool* FragmentInstanceState::obj_pool() {
  return query_state_->obj_pool();
}

RuntimeProfile* FragmentInstanceState::profile() const {
  return runtime_state_->runtime_profile();
}

void FragmentInstanceState::PrintVolumeIds() {
  if (instance_ctx_.per_node_scan_ranges.empty()) return;

  HdfsScanNodeBase::PerVolumeStats per_volume_stats;
  for (const PerNodeScanRanges::value_type& entry: instance_ctx_.per_node_scan_ranges) {
    HdfsScanNodeBase::UpdateHdfsSplitStats(entry.second, &per_volume_stats);
  }

  stringstream str;
  HdfsScanNodeBase::PrintHdfsSplitStats(per_volume_stats, &str);
  profile()->AddInfoString(HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC, str.str());
  VLOG_FILE
      << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query="
      << PrintId(query_id()) << ":\n" << str.str();
}
