// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.


#ifndef IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H
#define IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H

#include <string>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>

#include "common/atomic.h"
#include "common/status.h"
#include "common/thread-debug-info.h"
#include "util/promise.h"

#include "gen-cpp/control_service.pb.h"
#include "gen-cpp/data_stream_service.pb.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
#include "runtime/row-batch.h"
#include "util/condition-variable.h"
#include "util/promise.h"
#include "util/runtime-profile.h"

namespace kudu {
namespace rpc {
class RpcContext;
} // namespace rpc
} // namespace kudu

namespace impala {

class TPlanFragmentCtx;
class TPlanFragmentInstanceCtx;
class TBloomFilter;
class TUniqueId;
class TNetworkAddress;
class TQueryCtx;
class QueryState;
class RuntimeProfile;
class ExecNode;
class PlanNode;
class PlanRootSink;
class Thread;
class DataSink;
class DataSinkConfig;
class RuntimeState;

/// FragmentInstanceState handles all aspects of the execution of a single plan fragment
/// instance, including setup and finalization, both in the success and error case.
/// Close() happens automatically at the end of Exec() and frees all memory allocated
/// for this fragment instance and closes all data streams.
///
/// The FIS makes an aggregated profile for the entire fragment available, which
/// includes profile information for the plan itself as well as the output sink. It also
/// contains a timeline of events of the fragment instance.
///
/// This class is thread-safe.
/// All non-getter public functions other than Exec() block until the Prepare phase
/// finishes.
/// No member variables, other than the ones passed to the c'tor, are valid before
/// the Prepare phase finishes.
///
/// TODO:
/// - absorb RuntimeState?
class FragmentInstanceState {
 public:
  FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
      const TPlanFragmentInstanceCtx& instance_ctx);

  /// Main loop of fragment instance execution. Blocks until execution finishes and
  /// automatically releases resources. Returns execution status.
  /// Must only be called once.
  Status Exec() WARN_UNUSED_RESULT;

  /// Cancels execution. Idempotent.
  void Cancel();

  /// Blocks until the Prepare phase of Exec() is finished and the exec tree is
  /// opened, and returns that status. If the preparation phase encountered an error,
  /// GetOpenStatus() will return that error without blocking.
  Status WaitForOpen();

  /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
  void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);

  /// Called periodically by query state thread to get the current status of this fragment
  /// instance. The fragment instance's status is stored in 'instance_status' and its
  /// Thrift runtime profile is stored in 'thrift_profile'.
  void GetStatusReport(FragmentInstanceExecStatusPB* instance_status,
      TRuntimeProfileTree* thrift_profile);

  /// After each call to GetStatusReport(), the query state thread should call one of the
  /// following to indicate if the report rpc was successful. Note that in the case of
  /// ReportFailed(), the report may have been received by the coordinator even though the
  /// rpc appeared to fail.
  void ReportSuccessful(const FragmentInstanceExecStatusPB& instance_status);
  void ReportFailed(const FragmentInstanceExecStatusPB& instance_status);

  /// Returns fragment instance's sink if this is the root fragment instance. Valid after
  /// the Prepare phase. May be nullptr.
  PlanRootSink* GetRootSink() const;

  /// Returns a string description of 'state'.
  static const string& ExecStateToString(FInstanceExecStatePB state);

  /// Name of the counter that is tracking per query, per host peak mem usage.
  /// TODO: this doesn't look like it belongs here
  static const std::string PER_HOST_PEAK_MEM_COUNTER;

  QueryState* query_state() { return query_state_; }
  RuntimeState* runtime_state() { return runtime_state_; }
  RuntimeProfile* profile() const;
  const TQueryCtx& query_ctx() const;
  const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; }
  const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; }
  const TUniqueId& query_id() const { return query_ctx().query_id; }
  const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
  FInstanceExecStatePB current_state() const { return current_state_.Load(); }
  bool final_report_sent() const { return final_report_sent_; }
  const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
  bool IsDone() const { return current_state_.Load() == FInstanceExecStatePB::FINISHED; }
  ObjectPool* obj_pool();

  /// Returns true if the current thread is a thread executing the whole or part of
  /// a fragment instance.
  static bool IsFragmentExecThread() {
    const static size_t name_len =
        strlen(FragmentInstanceState::FINST_THREAD_NAME_PREFIX.c_str());
    const char* name = GetThreadDebugInfo()->GetThreadName();
    return name != nullptr &&
        (strncmp(name, FINST_THREAD_NAME_PREFIX.c_str(), name_len) == 0 ||
         strncmp(name, "join-build-thread", 17) == 0);
  }

  static const std::string FINST_THREAD_GROUP_NAME;
  static const std::string FINST_THREAD_NAME_PREFIX;

 private:
  QueryState* query_state_;
  const TPlanFragmentCtx& fragment_ctx_;
  const TPlanFragmentInstanceCtx& instance_ctx_;

  /// All following member variables that are initialized to nullptr are set
  /// in Prepare().
  ExecNode* exec_tree_ = nullptr; // lives in obj_pool()
  RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
  /// Lives in obj_pool(). Not mutated after being initialized.
  const PlanNode* plan_tree_ = nullptr;
  const DataSinkConfig* sink_config_ = nullptr;

  /// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
  /// There should be only one thread doing status report at the same time.
  DFAKE_MUTEX(report_status_lock_);

  /// Monotonically increasing sequence number used in status report to prevent
  /// duplicated or out-of-order reports.
  int64_t report_seq_no_ = 0;

  /// True iff the final report has already been sent. Read exclusively by the query
  /// state thread only. Written in GetStatusReport() by the query state thread.
  bool final_report_sent_ = false;

  /// The non-idempotent parts of any reports that were generated but may not have been
  /// received by the coordinator.
  std::vector<StatefulStatusPB> prev_stateful_reports_;

  /// True if a report has been generated where 'done' is true, after which the sequence
  /// number should not be bumped for future reports.
  bool final_report_generated_ = false;

  /// Profile for timings for each stage of the plan fragment instance's lifecycle.
  /// Lives in obj_pool().
  RuntimeProfile* timings_profile_ = nullptr;

  /// Event sequence tracking the completion of various stages of this fragment instance.
  /// Updated in UpdateState().
  RuntimeProfile::EventSequence* event_sequence_ = nullptr;

  /// Events that change the current state of this instance's execution, which is kept in
  /// 'current_state_'. Events are issued throughout the execution by calling
  /// UpdateState(), which implements a state machine. See the implementation of
  /// UpdateState() for valid state transitions.
  enum class StateEvent {
    /// Indicates the start of execution.
    PREPARE_START,
    /// Indicates that codegen will get called. Omitted if not doing codegen.
    CODEGEN_START,
    /// Indicates the call to Open().
    OPEN_START,
    /// Indicates waiting for the first batch to arrive.
    WAITING_FOR_FIRST_BATCH,
    /// Indicates that a new batch was produced by this instance.
    BATCH_PRODUCED,
    /// Indicates that a batch has been sent.
    BATCH_SENT,
    /// Indicates that no new batches will be received.
    LAST_BATCH_SENT,
    /// Indicates the end of this instance's execution.
    EXEC_END
  };

  /// The current state of this fragment instance's execution. Only updated by the
  /// fragment instance thread in UpdateState() and read by the profile reporting threads.
  AtomicEnum<FInstanceExecStatePB> current_state_{FInstanceExecStatePB::WAITING_FOR_EXEC};

  /// Output sink for rows sent to this fragment. Created in Prepare(), lives in
  /// obj_pool().
  DataSink* sink_ = nullptr;

  /// should live in obj_pool(), but managed separately so we can delete it in Close()
  boost::scoped_ptr<RowBatch> row_batch_;

  /// Set when Prepare() returns.
  Promise<Status> prepared_promise_;

  /// Set when OpenInternal() returns.
  Promise<Status> opened_promise_;

  /// Returns the monotonically increasing sequence number.
  /// Called by query state thread only.
  int64_t AdvanceReportSeqNo() {
    DCHECK(!final_report_generated_);
    return ++report_seq_no_;
  }

  /// A counter for the per query, per host peak mem usage. Note that this is not the
  /// max of the peak memory of all fragments running on a host since it needs to take
  /// into account when they are running concurrently. All fragments for a single query
  /// on a single host will have the same value for this counter.
  RuntimeProfile::Counter* per_host_mem_usage_ = nullptr;

  /// Number of rows returned by this fragment
  /// TODO: by this instance?
  RuntimeProfile::Counter* rows_produced_counter_ = nullptr;

  /// Average number of thread tokens for the duration of the fragment instance execution.
  /// Instances that do a lot of cpu work (non-coordinator fragment) will have at
  /// least 1 token.  Instances that contain a hdfs scan node will have 1+ tokens
  /// depending on system load.  Other nodes (e.g. hash join node) can also reserve
  /// additional tokens.
  /// This is a measure of how much CPU resources this instance used during the course
  /// of the execution.
  /// TODO-MT: remove
  RuntimeProfile::Counter* avg_thread_tokens_ = nullptr;

  /// Sampled memory usage at even time intervals.
  RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_ = nullptr;

  /// Sampled thread usage (tokens) at even time intervals.
  RuntimeProfile::TimeSeriesCounter* thread_usage_sampled_counter_ = nullptr;

  /// Prepare for execution. runtime_state() will not be valid until Prepare() is called.
  /// runtime_state() will always be valid after Prepare() returns.
  /// If request.query_options.mem_limit > 0, it is used as an
  /// approximate limit on the number of bytes this query can consume at runtime.  The
  /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
  ///
  /// A failure in Prepare() will result in partially-initialized state.
  Status Prepare() WARN_UNUSED_RESULT;

  /// Executes Open() logic and returns resulting status.
  Status Open() WARN_UNUSED_RESULT;

  /// Pulls row batches from exec_tree_ and pushes them to sink_ in a loop. Returns
  /// OK if the input was exhausted and sent to the sink successfully, an error otherwise.
  /// If ExecInternal() returns without an error condition, all rows will have been sent
  /// to the sink and the sink will have been flushed.
  Status ExecInternal() WARN_UNUSED_RESULT;

  /// Closes the underlying fragment instance and frees up all resources allocated in
  /// Prepare() and Open(). Can handle partially-finished Prepare().
  void Close();

  /// Handle the execution event 'event'. This implements a state machine and will update
  /// the current execution state of this fragment instance. Also marks an event in
  /// 'event_sequence_' for some states. Must not be called by multiple threads
  /// concurrently.
  void UpdateState(const StateEvent event);

  /// Releases the thread token for this fragment executor. Can handle
  /// partially-finished Prepare().
  void ReleaseThreadToken();

  /// Print stats about scan ranges for each volumeId in params to info log.
  void PrintVolumeIds();
};

}

#endif
