// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef IMPALA_RUNTIME_SORTER_H_
#define IMPALA_RUNTIME_SORTER_H_

#include <deque>

#include "runtime/bufferpool/buffer-pool.h"
#include "util/tuple-row-compare.h"

namespace impala {

class SortedRunMerger;
class RuntimeProfile;
class RowBatch;

/// Sorter contains the external sort implementation. Its purpose is to sort arbitrarily
/// large input data sets with a fixed memory budget by spilling data to disk if
/// necessary.
//
/// The client API for Sorter is as follows:
/// AddBatch() is used to add input rows to be sorted. Multiple tuples in an input row are
/// materialized into a row with a single tuple (the sort tuple) using the materialization
/// exprs in sort_tuple_exprs_. The sort tuples are sorted according to the sort
/// parameters and output by the sorter. AddBatch() can be called multiple times.
//
/// Callers that don't want to spill can use AddBatchNoSpill() instead, which only adds
/// rows up to the memory limit and then returns the number of rows that were added.
/// For this use case, 'enable_spill' should be set to false so that the sorter can reduce
/// the number of buffers requested from the block mgr since there won't be merges.
//
/// InputDone() is called to indicate the end of input. If multiple sorted runs were
/// created, it triggers intermediate merge steps (if necessary) and creates the final
/// merger that returns results via GetNext().
//
/// GetNext() is used to retrieve sorted rows. It can be called multiple times.
/// AddBatch()/AddBatchNoSpill(), InputDone() and GetNext() must be called in that order.
//
/// Batches of input rows are collected into a sequence of pinned BufferPool pages
/// called a run. The maximum size of a run is determined by the number of pages that
/// can be pinned by the Sorter. After the run is full, it is sorted in memory, unpinned
/// and the next run is constructed. The variable-length column data (e.g. string slots)
/// in the materialized sort tuples are stored in a separate sequence of pages from the
/// tuples themselves.  When the pages containing tuples in a run are unpinned, the
/// var-len slot pointers are converted to offsets from the start of the first var-len
/// data page. When a page is read back, these offsets are converted back to pointers.
/// The in-memory sorter sorts the fixed-length tuples in-place. The output rows have the
/// same schema as the materialized sort tuples.
//
/// After the input is consumed, the sorter is left with one or more sorted runs. If
/// there are multiple runs, the runs are merged using SortedRunMerger. At least one
/// page per run (two if there are var-length slots) must be pinned in memory during
/// a merge, so multiple merges may be necessary if the number of runs is too large.
/// First a series of intermediate merges are performed, until the number of runs is
/// small enough to do a single final merge that returns batches of sorted rows to the
/// caller of GetNext().
///
/// If there is a single sorted run (i.e. no merge required), only tuple rows are
/// copied into the output batch supplied by GetNext(), and the data itself is left in
/// pinned pages held by the sorter.
///
/// When merges are performed, one input batch is created to hold tuple rows for each
/// input run, and one batch is created to hold deep copied rows (i.e. ptrs + data) from
/// the output of the merge.
//
/// Note that Init() must be called right after the constructor.
//
/// During a merge, one row batch is created for each input run, and one batch is created
/// for the output of the merge (if is not the final merge). It is assumed that the memory
/// for these batches have already been accounted for in the memory budget for the sort.
/// That is, the memory for these batches does not come out of the buffer pool.
//
/// TODO: Not necessary to actually copy var-len data - instead take ownership of the
/// var-length data in the input batch. Copying can be deferred until a run is unpinned.
/// TODO: When the first run is constructed, create a sequence of pointers to materialized
/// tuples. If the input fits in memory, the pointers can be sorted instead of sorting the
/// tuples in place.
class Sorter {
 public:
  /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to be
  /// sorted. 'ordering_exprs', 'is_asc_order' and 'nulls_first' are parameters
  /// for the comparator for the sort tuples.
  /// 'node_label' is the label of the exec node using the sorter for error reporting.
  /// 'enable_spilling' should be set to false to reduce the number of requested buffers
  /// if the caller will use AddBatchNoSpill().
  ///
  /// The Sorter assumes that it has exclusive use of the client's
  /// reservations for sorting, and may increase the size of the client's reservation.
  /// The caller is responsible for ensuring that the minimum reservation (returned from
  /// ComputeMinReservation()) is available.
  Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
      const std::vector<bool>& is_asc_order, const std::vector<bool>& nulls_first,
      const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
      MemTracker* mem_tracker, BufferPool::ClientHandle* client, int64_t page_len,
      RuntimeProfile* profile, RuntimeState* state, const std::string& node_label,
      bool enable_spilling);
  ~Sorter();

  /// Initial set-up of the sorter for execution.
  /// The evaluators for 'sort_tuple_exprs_' will be created and stored in 'obj_pool'.
  Status Prepare(ObjectPool* obj_pool) WARN_UNUSED_RESULT;

  /// Do codegen for the Sorter. Called after Prepare() if codegen is desired. Returns OK
  /// if successful or a Status describing the reason why Codegen failed otherwise.
  Status Codegen(RuntimeState* state);

  /// Opens the sorter for adding rows and initializes the evaluators for materializing
  /// the tuples. Must be called after Prepare() or Reset() and before calling AddBatch().
  Status Open() WARN_UNUSED_RESULT;

  /// Adds the entire batch of input rows to the sorter. If the current unsorted run fills
  /// up, it is sorted and a new unsorted run is created. Cannot be called if
  /// 'enable_spill' is false.
  Status AddBatch(RowBatch* batch) WARN_UNUSED_RESULT;

  /// Adds input rows to the current unsorted run, starting from 'start_index' up to the
  /// memory limit. Returns the number of rows added in 'num_processed'.
  Status AddBatchNoSpill(
      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT;

  /// Called to indicate there is no more input. Triggers the creation of merger(s) if
  /// necessary.
  Status InputDone() WARN_UNUSED_RESULT;

  /// Get the next batch of sorted output rows from the sorter.
  Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;

  /// Resets all internal state like ExecNode::Reset().
  /// Init() must have been called, AddBatch()/GetNext()/InputDone()
  /// may or may not have been called.
  void Reset();

  /// Close the Sorter and free resources.
  void Close(RuntimeState* state);

  /// Compute the minimum amount of buffer memory in bytes required to execute a
  /// sort with the current sorter. Must be kept in sync with
  /// SortNode.computeNodeResourceProfile() in fe.
  int64_t ComputeMinReservation() const;

  /// Return true if the sorter has any spilled runs.
  bool HasSpilledRuns() const;

 private:
  class Page;
  class Run;
  class TupleIterator;
  class TupleSorter;

  /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign it to
  /// 'merger_'. 'num_runs' indicates how many runs should be covered by the current
  /// merging attempt. Returns error if memory allocation fails during in
  /// Run::PrepareRead(). The runs to be merged are removed from 'sorted_runs_'. The
  /// Sorter sets the 'deep_copy_input' flag to true for the merger, since the pages
  /// containing input run data will be deleted as input runs are read.
  Status CreateMerger(int num_runs) WARN_UNUSED_RESULT;

  /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger
  /// merged run until there are few enough runs to be merged with a single merger.
  /// Returns when 'merger_' is set up to merge the final runs. If the number of sorted
  /// runs is too large, merge sets of smaller runs into large runs until a final merge
  /// can be performed. An intermediate row batch containing deep copied rows is used for
  /// the output of each intermediate merge.
  Status MergeIntermediateRuns() WARN_UNUSED_RESULT;

  /// Execute a single step of the intermediate merge, pulling rows from 'merger_'
  /// and adding them to 'merged_run'.
  Status ExecuteIntermediateMerge(Sorter::Run* merged_run) WARN_UNUSED_RESULT;

  /// Called once there no more rows to be added to 'unsorted_run_'. Sorts
  /// 'unsorted_run_' and appends it to the list of sorted runs.
  Status SortCurrentInputRun() WARN_UNUSED_RESULT;

  /// Helper that cleans up all runs in the sorter.
  void CleanupAllRuns();

  /// Based on the amount of unused buffers the Sorter has through the BufferPool this
  /// function calculates the maximum number of runs that can be taken care of during the
  /// next merge intermediate merge. Takes into account that a separate run is needed for
  /// the output.
  int MaxRunsInNextMerge() const;

  /// Calculates the number of runs the 'merger_' should grab for merging in the current
  /// round of merging. Returns at most MaxRunsInNextMerge(), so the Sorter will have
  /// enough reservation to merge this number of runs.
  int GetNumOfRunsForMerge() const;

  /// If the number of available buffers is not enough to grab all the runs to merge in
  /// one round then this functions starts allocating additional free buffers one by one
  /// until it reaches the maximum limit the Sorter can have or until we have enough free
  /// buffers for all the runs. This is possible if other operators have released memory
  /// since the Sorter has started working on it's initial runs.
  void TryToIncreaseMemAllocationForMerge();

  /// Label of the ExecNode that owns the sorter, used for error reporting.
  const std::string node_label_;

  /// Runtime state instance used to check for cancellation. Not owned.
  RuntimeState* const state_;

  /// MemPool for allocating data structures used by expression evaluators in the sorter.
  MemPool expr_perm_pool_;

  /// MemPool for allocations that hold results of expression evaluation in the sorter.
  /// Cleared periodically during sorting to prevent memory accumulating.
  MemPool expr_results_pool_;

  /// In memory sorter and less-than comparator.
  TupleRowComparator compare_less_than_;
  boost::scoped_ptr<TupleSorter> in_mem_tuple_sorter_;

  /// Client used to allocate pages from the buffer pool. Not owned.
  BufferPool::ClientHandle* const buffer_pool_client_;

  /// The length of page to use.
  const int64_t page_len_;

  /// True if the tuples to be sorted have var-length slots.
  bool has_var_len_slots_;

  /// Expressions used to materialize the sort tuple. One expr per slot in the tuple.
  const std::vector<ScalarExpr*>& sort_tuple_exprs_;
  std::vector<ScalarExprEvaluator*> sort_tuple_expr_evals_;

  /// Mem tracker for batches created during merge. Not owned by Sorter.
  MemTracker* mem_tracker_;

  /// Descriptor for the sort tuple. Input rows are materialized into 1 tuple before
  /// sorting. Not owned by the Sorter.
  RowDescriptor* output_row_desc_;

  /// True if this sorter can spill. Used to determine the number of buffers to reserve.
  bool enable_spilling_;

  /////////////////////////////////////////
  /// BEGIN: Members that must be Reset()

  /// The current unsorted run that is being collected. Is sorted and added to
  /// sorted_runs_ after it is full (i.e. number of pages allocated == max available
  /// buffers) or after the input is complete. Owned and placed in obj_pool_.
  /// When it is added to sorted_runs_, it is set to NULL.
  Run* unsorted_run_;

  /// List of sorted runs that have been produced but not merged. unsorted_run_ is added
  /// to this list after an in-memory sort. Sorted runs produced by intermediate merges
  /// are also added to this list during the merge. Runs are added to the object pool.
  std::deque<Run*> sorted_runs_;

  /// Merger object (intermediate or final) currently used to produce sorted runs.
  /// Only one merge is performed at a time. Will never be used if the input fits in
  /// memory.
  boost::scoped_ptr<SortedRunMerger> merger_;

  /// Spilled runs that are currently processed by the merge_.
  /// These runs can be deleted when we are done with the current merge.
  std::deque<Run*> merging_runs_;

  /// Output run for the merge. Stored in Sorter() so that it can be cleaned up
  /// in Sorter::Close() in case of errors.
  Run* merge_output_run_;

  /// Pool of owned Run objects. Maintains Runs objects across non-freeing Reset() calls.
  ObjectPool run_pool_;

  /// END: Members that must be Reset()
  /////////////////////////////////////////

  /// Runtime profile and counters for this sorter instance.
  RuntimeProfile* profile_;

  /// Pool of objects (e.g. exprs) that are not freed during Reset() calls.
  ObjectPool obj_pool_;

  /// Number of initial runs created.
  RuntimeProfile::Counter* initial_runs_counter_;

  /// Number of runs that were unpinned and may have spilled to disk, including initial
  /// and intermediate runs.
  RuntimeProfile::Counter* spilled_runs_counter_;

  /// Number of merges of sorted runs.
  RuntimeProfile::Counter* num_merges_counter_;

  /// Time spent sorting initial runs in memory.
  RuntimeProfile::Counter* in_mem_sort_timer_;

  /// Total size of the initial runs in bytes.
  RuntimeProfile::Counter* sorted_data_size_;

  /// Min, max, and avg size of runs in number of tuples.
  RuntimeProfile::SummaryStatsCounter* run_sizes_;
};

} // namespace impala

#endif
