// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>
#include "exec/exec-node.h"
#include "exec/filter-context.h"
#include "util/runtime-profile.h"
#include "util/thread.h"
#include "gen-cpp/ImpalaInternalService_types.h"

namespace impala {

class BlockingRowBatchQueue;
class TScanRange;

/// Abstract base class of all scan nodes. Subclasses support different storage layers
/// and different threading models.
///
/// Includes ScanNode common counters:
///   BytesRead - total bytes read from disk by this scan node. Provided as a counter
///     as well as a time series that samples the counter. Only implemented for scan node
///     subclasses that expose the bytes read, e.g. HDFS and HBase.
///
///   TotalReadThroughput - BytesRead divided by the total wall clock time that this scan
///     was executing (from Open() to Close()). This gives the aggregate rate that data
///     is read from disks. If this is the only scan executing, ideally this will
///     approach the maximum bandwidth supported by the disks.
///
///   RowsRead - number of top-level rows/tuples read from the storage layer, including
///     those discarded by predicate evaluation. Used for all types of scans.
///
///   CollectionItemsRead - total number of nested collection items read by the scan.
///     Only created for scans (e.g. Parquet) that support nested types.
///
///   ScanRangesComplete - number of scan ranges completed. Initialized for scans that
///     have a concept of "scan range".
///
///   MaterializeTupleTime - wall clock time spent materializing tuples and evaluating
///     predicates.
///
/// The following counters are specific to multithreaded scan node implementations:
///
///   PeakScannerThreadConcurrency - the peak number of scanner threads executing at any
///     one time. Present only for multithreaded scan nodes.
///
///   AverageScannerThreadConcurrency - the average number of scanner threads executing
///     between Open() and the time when the scan completes. Present only for
///     multithreaded scan nodes.
///
///   NumScannerThreadsStarted - the number of scanner threads started for the duration
///     of the ScanNode. This is at most the number of scan ranges but should be much
///     less since a single scanner thread will likely process multiple scan ranges.
///     This is *not* the same as peak scanner thread concurrency because the number of
///     scanner threads can fluctuate during execution of the scan.
///
///   ScannerThreadsTotalWallClockTime - total wall clock time spent in all scanner
///     threads.
///
///   ScannerThreadsUserTime, ScannerThreadsSysTime,
///   ScannerThreadsVoluntaryContextSwitches, ScannerThreadsInvoluntaryContextSwitches -
///     these are aggregated counters across all scanner threads of this scan node. They
///     are taken from getrusage. See RuntimeProfile::ThreadCounters for details.
///
///   RowBatchesEnqueued, RowBatchBytesEnqueued - Number of row batches and bytes enqueued
///     in the scan node's output queue.
///
///   RowBatchQueueGetWaitTime - Wall clock time that the fragment execution thread spent
///     blocked waiting for row batches to be added to the scan node's output queue.
///
///   RowBatchQueuePutWaitTime - Wall clock time that the scanner threads spent blocked
///     waiting for space in the scan node's output queue when it is full.
///
///   RowBatchQueueCapacity - capacity in batches of the scan node's output queue.
///
///   RowBatchQueuePeakMemoryUsage - peak memory consumption of row batches enqueued in
///     the scan node's output queue.
///
class ScanNode : public ExecNode {
 public:
  ScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
    : ExecNode(pool, tnode, descs),
      scan_range_params_(NULL) {}

  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
  virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
  virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;

  /// Stops all periodic counters and calls ExecNode::Close(). Subclasses of ScanNode can
  /// start periodic counters and rely on this function stopping them.
  virtual void Close(RuntimeState* state);

  /// This should be called before Prepare(), and the argument must be not destroyed until
  /// after Prepare().
  void SetScanRanges(const std::vector<TScanRangeParams>& scan_range_params) {
    scan_range_params_ = &scan_range_params;
  }

  virtual bool IsScanNode() const { return true; }

  /// Returns true iff the data cache in IoMgr is disabled by query options.
  bool IsDataCacheDisabled() const;

  RuntimeState* runtime_state() const { return runtime_state_; }
  RuntimeProfile::Counter* bytes_read_counter() const { return bytes_read_counter_; }
  RuntimeProfile::Counter* rows_read_counter() const { return rows_read_counter_; }
  RuntimeProfile::Counter* collection_items_read_counter() const {
    return collection_items_read_counter_;
  }
  RuntimeProfile::Counter* materialize_tuple_timer() const {
    return materialize_tuple_timer_;
  }

  /// names of ScanNode common counters
  static const std::string BYTES_READ_COUNTER;
  static const std::string ROWS_READ_COUNTER;
  static const std::string COLLECTION_ITEMS_READ_COUNTER;
  static const std::string TOTAL_HDFS_READ_TIMER;
  static const std::string TOTAL_HDFS_OPEN_FILE_TIMER;
  static const std::string TOTAL_HBASE_READ_TIMER;
  static const std::string TOTAL_THROUGHPUT_COUNTER;
  static const std::string PER_READ_THREAD_THROUGHPUT_COUNTER;
  static const std::string NUM_DISKS_ACCESSED_COUNTER;
  static const std::string MATERIALIZE_TUPLE_TIMER;
  static const std::string SCAN_RANGES_COMPLETE_COUNTER;
  static const std::string SCANNER_THREAD_COUNTERS_PREFIX;
  static const std::string SCANNER_THREAD_TOTAL_WALLCLOCK_TIME;
  static const std::string AVERAGE_SCANNER_THREAD_CONCURRENCY;
  static const std::string PEAK_SCANNER_THREAD_CONCURRENCY;
  static const std::string AVERAGE_HDFS_READ_THREAD_CONCURRENCY;
  static const std::string NUM_SCANNER_THREADS_STARTED;

  const std::vector<ScalarExpr*>& filter_exprs() const { return filter_exprs_; }

  const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }

 protected:
  RuntimeState* runtime_state_ = nullptr;

  /// The scan ranges this scan node is responsible for. Not owned.
  const std::vector<TScanRangeParams>* scan_range_params_;

  /// Total bytes read from the scanner. Initialised in subclasses that track
  /// bytes read, including HDFS and HBase by calling AddBytesReadCounters().
  RuntimeProfile::Counter* bytes_read_counter_ = nullptr;

  /// Time series of 'bytes_read_counter_', initialized at the same time.
  RuntimeProfile::TimeSeriesCounter* bytes_read_timeseries_counter_ = nullptr;

  /// Wall based aggregate read throughput [bytes/sec]. Depends on 'bytes_read_counter_'
  /// and initialized at the same time.
  RuntimeProfile::Counter* total_throughput_counter_ = nullptr;

  /// # top-level rows/tuples read from the scanner, including those discarded by
  /// EvalConjuncts(). Used for all types of scans.
  RuntimeProfile::Counter* rows_read_counter_ = nullptr;

  /// # items the scanner read into CollectionValues. For example, for schema
  /// array<struct<B: INT, array<C: INT>> and tuple
  /// [(2, [(3)]), (4, [])] this counter will be 3: (2, [(3)]), (3) and (4, [])
  /// Initialized by subclasses that support scanning nested types.
  RuntimeProfile::Counter* collection_items_read_counter_ = nullptr;

  /// Total time writing tuple slots. Used for all types of scans.
  RuntimeProfile::Counter* materialize_tuple_timer_ = nullptr;

  /// Total number of scan ranges completed. Initialised in subclasses that have a
  /// concept of "scan range", including HDFS and Kudu.
  RuntimeProfile::Counter* scan_ranges_complete_counter_ = nullptr;

  /// Expressions to evaluate the input rows for filtering against runtime filters.
  std::vector<ScalarExpr*> filter_exprs_;

  /// List of contexts for expected runtime filters for this scan node. These contexts are
  /// cloned by individual scanners to be used in multi-threaded contexts, passed through
  /// the per-scanner ScannerContext. Correspond to exprs in 'filter_exprs_'.
  std::vector<FilterContext> filter_ctxs_;

  /// Initializes 'bytes_read_counter_', 'bytes_read_timeseries_counter_' and
  /// 'total_throughput_counter_'
  void AddBytesReadCounters();

  /// Waits for runtime filters to arrive, checking every 20ms. Max wait time is specified
  /// by the 'runtime_filter_wait_time_ms' flag, which is overridden by the query option
  /// of the same name. Returns true if all filters arrived within the time limit (as
  /// measured from the time of RuntimeFilterBank::RegisterFilter()), false otherwise.
  bool WaitForRuntimeFilters();

  /// Additional state only used by multi-threaded scan node implementations.
  /// The lifecycle is as follows:
  /// 1. Prepare() is called.
  /// 2. Open() is called.
  /// 3. Other methods can be called.
  /// 4. Shutdown() is called to prevent new batches being added to the queue.
  /// 5. Close() is called to release all resources.
  class ScannerThreadState {
   public:
    /// Called from *ScanNode::Prepare() to initialize counters and MemTracker.
    /// 'estimated_per_thread_mem' is the estimated memory consumption of each scanner
    /// thread and must be positive. Prepare() registers the scan with the query-global
    /// ScannerMemLimit and accounts for the first thread's memory consumption.
    void Prepare(ScanNode* parent, int64_t estimated_per_thread_mem);

    /// Called from *ScanNode::Open() to create the row batch queue and start periodic
    /// counters running. 'max_row_batches_override' determines size of the row batch
    /// queue if >= 0. Otherwise the size is automatically determined.
    void Open(ScanNode* parent, int64_t max_row_batches_override);

    /// Called when no more batches need to be enqueued or dequeued. Shuts down the
    /// queue. Thread-safe.
    void Shutdown();

    /// Waits for all scanner threads to finish and cleans up the queue. Called from
    /// *ScanNode::Close(). No other methods can be called after this. Not thread-safe.
    void Close(ScanNode* parent);

    /// Add a new scanner thread to the thread group. Not thread-safe: only one thread
    /// should call AddThread() at a time.
    void AddThread(std::unique_ptr<Thread> thread);

    /// Get the number of active scanner threads. Thread-safe.
    int32_t GetNumActive() const { return num_active_.Load(); }

    /// Get the number of started scanner threads. Thread-safe.
    int32_t GetNumStarted() const { return num_threads_started_->value(); }

    /// Called from a scanner thread that is exiting to decrement the number of active
    /// scanner threads. Returns true if this was the last thread to exit. Thread-safe.
    bool DecrementNumActive();

    /// Adds a materialized row batch for the scan node.  This is called from scanner
    /// threads. This function will block if the row batch queue is full. Thread-safe.
    void EnqueueBatch(std::unique_ptr<RowBatch> row_batch);

    /// Adds a materialized row batch for the scan node. This is called from scanner
    /// threads. This function will block for up to timeout_micros if the row batch
    /// queue is full. Return true and takes ownership of '*row_batch' if the batch
    /// was successfully enqueued. Returns false if the timeout expired or the queue
    /// was shut down and the batch could not be enqueued and does not take ownership
    /// of '*row_batch'. Thread-safe.
    bool EnqueueBatchWithTimeout(std::unique_ptr<RowBatch>* row_batch,
        int64_t timeout_micros);

    BlockingRowBatchQueue* batch_queue() { return batch_queue_.get(); }
    RuntimeProfile::ThreadCounters* thread_counters() const { return thread_counters_; }
    int max_num_scanner_threads() const { return max_num_scanner_threads_; }
    int64_t estimated_per_thread_mem() const { return estimated_per_thread_mem_; }
    RuntimeProfile::Counter* scanner_thread_mem_unavailable_counter() const {
      return scanner_thread_mem_unavailable_counter_;
    }

   private:
    /// Thread group for all scanner threads.
    ThreadGroup scanner_threads_;

    /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
    /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
    /// are generally cpu bound so there is no benefit in spinning up more threads than
    /// the number of cores. Set in Open().
    int max_num_scanner_threads_ = 0;

    /// Estimated amount of memory that each additional scanner thread will consume. Used
    /// to decide whether enough memory is available to create a new scanner thread. Set
    /// to 0 when the memory for the first thread claimed in Prepare() is released.
    int64_t estimated_per_thread_mem_ = 0;

    // MemTracker for queued row batches. Initialized in Prepare(). Owned by RuntimeState.
    MemTracker* row_batches_mem_tracker_ = nullptr;

    /// Outgoing row batches queue. Row batches are produced asynchronously by the scanner
    /// threads and consumed by the main fragment thread that calls GetNext() on the scan
    /// node.
    boost::scoped_ptr<BlockingRowBatchQueue> batch_queue_;

    /// The number of scanner threads currently running.
    AtomicInt32 num_active_{0};

    /// Aggregated scanner thread CPU time counters.
    RuntimeProfile::ThreadCounters* thread_counters_ = nullptr;

    /// Average number of executing scanner threads
    /// This should be created in Open and stopped when all the scanner threads are done.
    RuntimeProfile::Counter* average_concurrency_ = nullptr;

    /// Peak number of executing scanner threads.
    RuntimeProfile::HighWaterMarkCounter* peak_concurrency_ = nullptr;

    /// Cumulative number of scanner threads created during the scan. Some may be created
    /// and then destroyed, so this can exceed the peak number of threads.
    RuntimeProfile::Counter* num_threads_started_ = nullptr;

    /// The number of row batches enqueued into the row batch queue.
    RuntimeProfile::Counter* row_batches_enqueued_ = nullptr;

    /// The total bytes of row batches enqueued into the row batch queue.
    RuntimeProfile::Counter* row_batch_bytes_enqueued_ = nullptr;

    /// The wait time for fetching a row batch from the row batch queue.
    RuntimeProfile::Counter* row_batches_get_timer_ = nullptr;

    /// The wait time for enqueuing a row batch into the row batch queue.
    RuntimeProfile::Counter* row_batches_put_timer_ = nullptr;

    /// Peak memory consumption of the materialized batch queue. Updated in Close().
    RuntimeProfile::Counter* row_batches_peak_mem_consumption_ = nullptr;

    /// Number of times scanner threads were not created because of memory not available.
    RuntimeProfile::Counter* scanner_thread_mem_unavailable_counter_ = nullptr;
  };
};
}
