// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef IMPALA_RUNTIME_QUERY_STATE_H
#define IMPALA_RUNTIME_QUERY_STATE_H

#include <memory>
#include <mutex>
#include <unordered_map>
#include <boost/scoped_ptr.hpp>

#include "common/atomic.h"
#include "common/object-pool.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/Types_types.h"
#include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
#include "runtime/tmp-file-mgr.h"
#include "util/container-util.h"
#include "util/counting-barrier.h"
#include "util/uid-util.h"

namespace impala {

class ControlServiceProxy;
class FragmentInstanceState;
class InitialReservations;
class MemTracker;
class ReportExecStatusRequestPB;
class ReservationTracker;
class RuntimeState;
class ScannerMemLimiter;
class ThriftSerializer;

/// Central class for all backend execution state (example: the FragmentInstanceStates
/// of the individual fragment instances) created for a particular query.
/// This class contains or makes accessible state that is shared across fragment
/// instances; in contrast, fragment instance-specific state is collected in
/// FragmentInstanceState.
///
/// The lifetime of a QueryState is dictated by a reference count. Any thread that
/// executes on behalf of a query, and accesses any of its state, must obtain a
/// reference to the corresponding QueryState and hold it for at least the
/// duration of that access. The reference is obtained and released via
/// QueryExecMgr::Get-/ReleaseQueryState() or via QueryState::ScopedRef (the latter
/// for references limited to the scope of a single function or block).
/// As long as the reference count is greater than 0, all of a query's control
/// structures (contained either in this class or accessible through this class, such
/// as the FragmentInstanceStates) are guaranteed to be alive.
///
/// Query execution resources (non-control-structure memory, scratch files, threads, etc)
/// are also managed via a separate resource reference count, which should be released as
/// soon as the resources are not needed to free resources promptly.
///
/// We maintain a state denoted by BackendExecState. The initial state is PREPARING.
/// Once all query fragment instances have finished FIS::Prepare(), the BackendExecState
/// will transition to:
/// - EXECUTING if all fragment instances succeeded in Prepare()
/// - ERROR if any fragment instances failed during or after Prepare()
/// - CANCELLED if the query is cancelled
///
/// Please note that even if some fragment instances hit an error during or after
/// Prepare(), the state transition from PREPARING won't happen until all fragment
/// instances have finished Prepare(). This makes sure the query state is initialized
/// to handle either a Cancel() RPC or a PublishFilter() RPC after PREPARING state.
///
/// Once BackendExecState() enters EXECUTING state, any error will trigger the
/// BackendExecState to go into ERROR state and the query execution is considered over
/// on this backend.
///
/// When any fragment instance execution returns with an error status, all fragment
/// instances are automatically cancelled. The query state thread (started by
/// QueryExecMgr) periodically reports the overall status, the current state of execution
/// and the profile of each fragment instance to the coordinator. The frequency of those
/// reports is controlled by the flag status_report_interval_ms; Setting it to 0 disables
/// periodic reporting altogether. Regardless of the value of that flag, a report is sent
/// at least once at the end of execution with an overall status and profile (and 'done'
/// indicator). If execution ended with an error, that error status will be part of
/// the final report (it will not be overridden by the resulting cancellation).
///
/// Thread-safe, unless noted otherwise.
///
/// TODO:
/// - set up kudu clients in Init(), remove related locking
class QueryState {
 public:
  /// Use this class to obtain a QueryState for the duration of a function/block,
  /// rather than manually via QueryExecMgr::Get-/ReleaseQueryState().
  /// Pattern:
  /// {
  ///   QueryState::ScopedRef qs(qid);
  ///   if (qs->query_state() == nullptr) <do something, such as return>
  ///   ...
  /// }
  class ScopedRef {
   public:
    /// Looks up the query state with GetQueryState(). The query state is non-NULL if
    /// the query was already registered.
    ScopedRef(const TUniqueId& query_id);
    ~ScopedRef();

    /// may return nullptr
    QueryState* get() const { return query_state_; }
    QueryState* operator->() const { return query_state_; }

   private:
    QueryState* query_state_;
    DISALLOW_COPY_AND_ASSIGN(ScopedRef);
  };

  /// a shared pool for all objects that have query lifetime
  ObjectPool* obj_pool() { return &obj_pool_; }

  const TQueryCtx& query_ctx() const { return query_ctx_; }
  const TUniqueId& query_id() const { return query_ctx().query_id; }
  const TQueryOptions& query_options() const {
    return query_ctx_.client_request.query_options;
  }
  MemTracker* query_mem_tracker() const { return query_mem_tracker_; }

  /// The following getters are only valid after Init().
  ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }

  /// The following getters are only valid after Init() and should be called only from
  /// the backend execution (ie. not the coordinator side, since they require holding
  /// an backend resource refcnt).
  ReservationTracker* buffer_reservation() const {
    DCHECK_GT(backend_resource_refcnt_.Load(), 0);
    return buffer_reservation_;
  }
  InitialReservations* initial_reservations() const {
    DCHECK_GT(backend_resource_refcnt_.Load(), 0);
    return initial_reservations_;
  }
  TmpFileMgr::FileGroup* file_group() const {
    DCHECK_GT(backend_resource_refcnt_.Load(), 0);
    return file_group_;
  }

  /// The following getters are only valid after StartFInstances().
  int64_t fragment_events_start_time() const { return fragment_events_start_time_; }

  /// The following getters are only valid after StartFInstances() and should be called
  /// only from the backend execution (ie. not the coordinator side, since they require
  /// holding an backend resource refcnt).
  const DescriptorTbl& desc_tbl() const {
    DCHECK_GT(backend_resource_refcnt_.Load(), 0);
    return *desc_tbl_;
  }

  /// Sets up state required for fragment execution: memory reservations, etc. Fails if
  /// resources could not be acquired. Acquires a backend resource refcount and returns
  /// it to the caller on both success and failure. The caller must release it by
  /// calling ReleaseBackendResourceRefcount().
  ///
  /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
  /// The remaining public functions must be called only after Init().
  Status Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
      const TExecPlanFragmentInfo& fragment_info) WARN_UNUSED_RESULT;

  /// Performs the runtime-intensive parts of initial setup and starts all fragment
  /// instances belonging to this query. Each instance receives its own execution
  /// thread. Not idempotent, not thread-safe. Must only be called by the query state
  /// thread. Returns true iff all fragment instance threads were started successfully.
  /// Returns false otherwise.
  bool StartFInstances();

  /// Monitors the execution of all underlying fragment instances and updates the query
  /// state accordingly. This is also responsible for sending status reports periodically
  /// to the coordinator. Not idempotent, not thread-safe. Must only be called by the
  /// query state thread.
  void MonitorFInstances();

  /// Blocks until all fragment instances have finished their Prepare phase.
  /// Returns the fragment instance state for 'instance_id' in *fi_state,
  /// or nullptr if it is not present.
  /// Returns an error if fragment preparation failed.
  Status GetFInstanceState(
      const TUniqueId& instance_id, FragmentInstanceState** fi_state);

  /// Blocks until all fragment instances have finished their Prepare phase.
  void PublishFilter(const TPublishFilterParams& params);

  /// Cancels all actively executing fragment instances. Blocks until all fragment
  /// instances have finished their Prepare phase. Idempotent.
  void Cancel();

  /// Increment the resource refcount. Must be decremented before the query state
  /// reference is released. A refcount should be held by a fragment or other entity
  /// for as long as it is consuming query backend execution resources (e.g. memory).
  void AcquireBackendResourceRefcount();

  /// Decrement the execution resource refcount and release resources if it goes to zero.
  /// All resource refcounts must be released before query state references are released.
  /// Should be called by the owner of the refcount after it is done consuming query
  /// execution resources.
  void ReleaseBackendResourceRefcount();

  /// Checks whether spilling is enabled for this query. Must be called before the first
  /// call to BufferPool::Unpin() for the query. Returns OK if spilling is enabled. If
  /// spilling is not enabled, logs a MEM_LIMIT_EXCEEDED error from
  /// tracker->MemLimitExceeded() to 'runtime_state'.
  Status StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker);

  ~QueryState();

  /// Return overall status of Prepare() phases of fragment instances. A failure
  /// in any instance's Prepare() will cause this function to return an error status.
  /// Blocks until all fragment instances have finished their Prepare() phase.
  Status WaitForPrepare();

  /// Called by a FragmentInstanceState thread to notify that it's done preparing.
  void DonePreparing() { discard_result(instances_prepared_barrier_->Notify()); }

  /// Called by a FragmentInstanceState thread to notify that it's done executing.
  void DoneExecuting() { discard_result(instances_finished_barrier_->Notify()); }

  /// Called by a fragment instance thread to notify that it hit an error during Prepare()
  /// Updates the query status and the failed instance ID if it's not set already.
  /// Also notifies anyone waiting on WaitForPrepare() if this is called by the last
  /// fragment instance to complete Prepare().
  void ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id);

  /// Called by a fragment instance thread to notify that it hit an error during Execute()
  /// Updates the query status and records the failed instance ID if they're not set
  /// already. Also notifies anyone waiting on WaitForFinishOrTimeout().
  void ErrorDuringExecute(const Status& status, const TUniqueId& finst_id);

  /// The default BATCH_SIZE.
  static const int DEFAULT_BATCH_SIZE = 1024;

 private:
  friend class QueryExecMgr;

  /// test execution
  friend class RuntimeState;
  friend class TestEnv;

  /// Blocks until all fragment instances have finished executing or until one of them
  /// hits an error, or until 'timeout_ms' milliseconds has elapsed. Returns 'true' if
  /// all fragment instances finished or one of them hits an error. Return 'false' on
  /// time out.
  bool WaitForFinishOrTimeout(int32_t timeout_ms);

  /// Blocks until all fragment instances have finished executing or until one of them
  /// hits an error.
  void WaitForFinish();

  /// States that a query goes through during its lifecycle.
  enum class BackendExecState {
    /// PREPARING: The inital state on receiving an ExecQueryFInstances() RPC from the
    /// coordinator. Implies that the fragment instances are being started.
    PREPARING,
    /// EXECUTING: All fragment instances managed by this QueryState have successfully
    /// completed Prepare(). Implies that the query is executing.
    EXECUTING,
    /// FINISHED: All fragment instances managed by this QueryState have successfully
    /// completed executing.
    FINISHED,
    /// CANCELLED: This query received a CancelQueryFInstances() RPC or was directed by
    /// the coordinator to cancel itself from a response to a ReportExecStatus() RPC.
    /// Does not imply that all the fragment instances have realized cancellation however.
    CANCELLED,
    /// ERROR: received an error from a fragment instance.
    ERROR
  };

  /// Pseudo-lock to verify only query state thread is updating 'backend_exec_state_'.
  DFAKE_MUTEX(backend_exec_state_lock_);

  /// Current state of this query in this executor.
  /// Thread-safety: Only updated by the query state thread.
  BackendExecState backend_exec_state_ = BackendExecState::PREPARING;

  /// Protects 'overall_status_' and 'failed_finstance_id_'.
  SpinLock status_lock_;

  /// The overall status of this QueryState.
  /// A backend can have an error from a specific fragment instance, or it can have a
  /// general error that is independent of any individual fragment. If reporting a
  /// single error, this status is always set to the error being reported. If reporting
  /// multiple errors, the status is set by the following rules:
  /// 1. A general error takes precedence over any fragment instance error.
  /// 2. Any fragment instance error takes precedence over any cancelled status.
  /// 3. If multiple fragments have errors, the first fragment to hit an error is given
  ///    preference.
  /// Status::OK if all the fragment instances managed by this QS are also Status::OK;
  /// Protected by 'status_lock_'.
  Status overall_status_;

  /// ID of first fragment instance to hit an error.
  /// Protected by 'status_lock_'.
  TUniqueId failed_finstance_id_;

  /// set in c'tor
  const TQueryCtx query_ctx_;

  /// the top-level MemTracker for this query (owned by obj_pool_), created in c'tor
  MemTracker* query_mem_tracker_ = nullptr;

  /// The RPC proxy used when reporting status of fragment instances to coordinator.
  /// Set in Init().
  std::unique_ptr<ControlServiceProxy> proxy_;

  /// Set in Init(). TODO: find a way not to have to copy this
  ExecQueryFInstancesRequestPB exec_rpc_params_;
  TExecPlanFragmentInfo fragment_info_;

  /// Buffer reservation for this query (owned by obj_pool_). Set in Init().
  ReservationTracker* buffer_reservation_ = nullptr;

  /// Pool of buffer reservations used to distribute initial reservations to operators
  /// in the query. Contains a ReservationTracker that is a child of
  /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Init().
  InitialReservations* initial_reservations_ = nullptr;

  /// Tracks expected memory consumption of all multithreaded scans for this query on
  /// this daemon. Owned by 'obj_pool_'. Set in Init().
  ScannerMemLimiter* scanner_mem_limiter_ = nullptr;

  /// Number of active fragment instances for this query that may consume resources for
  /// query backend execution (i.e. threads, memory) on the Impala daemon.  Query-wide
  /// backend execution resources for this query are released once this goes to zero.
  AtomicInt32 backend_resource_refcnt_;

  /// Temporary files for this query (owned by obj_pool_). Non-null if spilling is
  /// enabled. Set in Prepare().
  TmpFileMgr::FileGroup* file_group_ = nullptr;

  /// created in StartFInstances(), owned by obj_pool_
  DescriptorTbl* desc_tbl_ = nullptr;

  /// Barrier for the completion of the Prepare() phases of all fragment instances. This
  /// just blocks until ALL fragment instances have finished preparing, regardless of
  /// whether they hit an error or not.
  std::unique_ptr<CountingBarrier> instances_prepared_barrier_;

  /// Barrier for the completion of all the fragment instances.
  /// If the 'Status' is not OK due to an error during fragment instance execution, this
  /// barrier is unblocked immediately. 'overall_status_' is set once this is unblocked
  /// and so is 'failed_instance_id_' if an error is hit.
  std::unique_ptr<CountingBarrier> instances_finished_barrier_;

  /// map from instance id to its state (owned by obj_pool_), populated in
  /// StartFInstances(); Not valid to read from until 'instances_prepared_barrier_'
  /// is set (i.e. readers should always call WaitForPrepare()).
  std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;

  /// map from fragment index to its instances (owned by obj_pool_), populated in
  /// StartFInstances(). Only written by the query state thread (i.e. the thread
  /// which executes StartFInstances()). Not valid to read from until
  /// 'instances_prepared_barrier_' is set (i.e. accessor should always call
  /// WaitForPrepare()).
  std::unordered_map<int, std::vector<FragmentInstanceState*>> fragment_map_;

  ObjectPool obj_pool_;
  AtomicInt32 refcnt_;

  /// set to 1 when any fragment instance fails or when Cancel() is called; used to
  /// initiate cancellation exactly once
  AtomicInt32 is_cancelled_;

  /// True if and only if ReleaseExecResources() has been called.
  bool released_backend_resources_ = false;

  /// Whether the query has spilled. 0 if the query has not spilled. Atomically set to 1
  /// when the query first starts to spill. Required to correctly maintain the
  /// "num-queries-spilled" metric.
  AtomicInt32 query_spilled_;

  /// Records the point in time when fragment instances are started up. Set in
  /// StartFInstances().
  int64_t fragment_events_start_time_ = 0;

  /// Tracks host resource usage of this backend. Owned by 'obj_pool_', created in c'tor.
  RuntimeProfile* const host_profile_;

  /// The number of failed intermediate reports since the last successfully sent report.
  int64_t num_failed_reports_ = 0;

  /// If a status report fails, set to the current time using MonotonicMillis(). Reset to
  /// 0 on a successful report. Used to track how long we've been trying unsuccessfully to
  /// send a status report so that we can cancel after a configurable timeout.
  int64_t failed_report_time_ms_ = 0;

  /// Create QueryState w/ a refcnt of 0 and a memory limit of 'mem_limit' bytes applied
  /// to the query mem tracker. The query is associated with the resource pool set in
  /// 'query_ctx.request_pool' or from 'request_pool', if the former is not set (needed
  /// for tests).
  QueryState(const TQueryCtx& query_ctx, int64_t mem_limit,
      const std::string& request_pool = "");

  /// Execute the fragment instance and decrement the refcnt when done.
  void ExecFInstance(FragmentInstanceState* fis);

  /// Called from Init() to set up buffer reservations and the file group.
  Status InitBufferPoolState() WARN_UNUSED_RESULT;

  /// Releases resources used for query backend execution. Guaranteed to be called only
  /// once. Must be called before destroying the QueryState. Not idempotent and not
  /// thread-safe.
  void ReleaseBackendResources();

  /// Helper for ReportExecStatus() to construct a status report to be sent to the
  /// coordinator. The execution statuses (e.g. 'done' indicator) of all fragment
  /// instances belonging to this query state are stored in 'report'. The Thrift
  /// serialized runtime profiles of fragment instances are stored in 'profiles_forest'.
  void ConstructReport(bool instances_started, ReportExecStatusRequestPB* report,
      TRuntimeProfileForest* profiles_forest);

  /// Gather statuses and profiles of all fragment instances belonging to this query state
  /// and send it to the coordinator via ReportExecStatus() RPC. Returns true if the
  /// report rpc was successful or if it was unsuccessful and we've reached the maximum
  /// number of allowed failures and cancelled.
  bool ReportExecStatus();

  /// Returns the amount of time in ms to wait before sending the next status report,
  /// calculated as a function of the status report interval with backoff based on the
  /// number of consecutive failed reports.
  int64_t GetReportWaitTimeMs() const;

  /// Returns true if the overall backend status is already set with an error.
  bool HasErrorStatus() const {
    return !overall_status_.ok() && !overall_status_.IsCancelled();
  }

  /// Returns true if the query has reached a terminal state.
  bool IsTerminalState() const {
    return backend_exec_state_ == BackendExecState::FINISHED
        || backend_exec_state_ == BackendExecState::CANCELLED
        || backend_exec_state_ == BackendExecState::ERROR;
  }

  /// Updates the BackendExecState based on 'overall_status_'. Should only be called when
  /// the current state is a non-terminal state. The transition can either be to the next
  /// legal state or ERROR if 'overall_status_' is an error. Called by the query state
  /// thread only. It acquires the 'status_lock_' to synchronize with the fragment
  /// instance threads' updates to 'overall_status_'.
  ///
  /// Upon reaching a terminal state, it will call ReportExecStatus() to send the final
  /// report to the coordinator and not expect to be called afterwards.
  void UpdateBackendExecState();

  /// A string representation of 'state'.
  const char* BackendExecStateToString(const BackendExecState& state);
};
}

#endif
