// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <vector>

#include <glog/logging.h>

#include "kudu/common/row_changelist.h"
#include "kudu/common/rowid.h"
#include "kudu/common/timestamp.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/rowset.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"

namespace kudu {

class Arena;
class ColumnBlock;
class ScanSpec;
class Schema;
class SelectionVector;
struct ColumnId;

namespace fs {
struct IOContext;
}  // namespace fs

namespace tablet {

class DeltaFileWriter;
class DeltaIterator;
class DeltaStats;
class Mutation;

// Tracks deltas that have been selected by PreparedDeltas::SelectDeltas.
//
// May track deltas belonging to a single delta store, or to multiple stores
// whose SelectedDeltas have been merged together.
class SelectedDeltas {
 public:
  SelectedDeltas() = default;

  // Equivalent to calling:
  //
  //   SelectedDeltas sd;
  //   sd.Reset(nrows);
  explicit SelectedDeltas(size_t nrows);

  // Converts the selected deltas into a simpler SelectionVector.
  void ToSelectionVector(SelectionVector* sel_vec) const;

  // Returns a textual representation suitable for debugging.
  std::string ToString() const;

 private:
  template<class Traits>
  friend class DeltaPreparer;

  // Mutation that has met the 'select' criteria in a delta store.
  struct Delta {
    // Key fields.

    // The delta's timestamp.
    Timestamp ts;

    // Whether this delta was an UNDO or a REDO.
    DeltaType dtype;

    // It's possible for multiple UNDOs or REDOs in the same delta store to
    // share a common timestamp. To ensure a total ordering, this additional key
    // field reflects the logical ordering of such deltas.
    //
    // For example, consider the sequence of REDOs:
    // D1: @ts10 UPDATE key=1
    // D2: @ts10 DELETE key=1
    //
    // D1 and D2 are identical as far as 'ts' and 'dtype' are concerned, so D1's
    // disambiguator must be less than that of D2.
    int64_t disambiguator;

    // Non-key fields.

    // Whether this delta was an UPDATE, DELETE, or REINSERT.
    RowChangeList::ChangeType ctype;
  };

  // Tracks the oldest and newest deltas for a given row.
  //
  // When there's only one delta, 'oldest' and 'newest' are equal and
  // 'same_delta' is true. Otherwise, oldest is guaranteed to be less than
  // newest as per the rules defined in DeltaLessThanFunctor.
  //
  // Most of the time "oldest" and "newest" is determined purely by timestamp,
  // but some deltas can share timestamps, in which case additional rules are
  // used to maintain a total ordering.
  struct DeltaPair {
    Delta oldest;
    Delta newest;
    bool same_delta;
  };

  // Comparator that establishes a total ordering amongst Deltas for the same row.
  struct DeltaLessThanFunctor {
    bool operator() (const Delta& a, const Delta& b) const {
      // Most of the time, deltas are ordered using timestamp.
      if (PREDICT_TRUE(a.ts != b.ts)) {
        return a.ts < b.ts;
      }

      // If the timestamps match, we can order by observing that UNDO < REDO, an
      // invariant that is preserved inside of a rowset.
      if (a.dtype != b.dtype) {
        return a.dtype == UNDO;
      }

      // The timestamps and delta types match. It should only be possible to get
      // here if we're comparing deltas from within the same store, in which
      // case the disambiguators must not match.
      if (PREDICT_TRUE(a.disambiguator != b.disambiguator)) {
        return a.disambiguator < b.disambiguator;
      }
      LOG(FATAL) << strings::Substitute("Could not differentiate between two deltas "
          "ts=$0, type=$1, disambiguator=$2", a.ts.ToString(), a.dtype, a.disambiguator);
    }
  };

  // Merges two SelectedDeltas together on a row-by-row basis.
  void MergeFrom(const SelectedDeltas& other);

  // Considers a new delta, possibly adding it to 'rows_'.
  void ProcessDelta(rowid_t row_idx, Delta new_delta);

  // Clears out 'rows_' and makes it suitable for handling 'nrows'.
  void Reset(size_t nrows);

  // All tracked deltas, indexed by row ordinal.
  //
  // If an element is std::nullopt, there are no deltas for that row.
  std::vector<std::optional<DeltaPair>> rows_;
};

// Interface for the pieces of the system that track deltas/updates.
// This is implemented by DeltaMemStore and by DeltaFileReader.
class DeltaStore {
 public:
  // Performs any post-construction work for the DeltaStore, which may
  // include additional I/O.
  virtual Status Init(const fs::IOContext* io_context) = 0;

  // Whether this delta store was initialized or not.
  virtual bool Initted() const = 0;

  // Create a DeltaIterator for the given projection.
  //
  // The projection in 'opts' corresponds to whatever scan is currently ongoing.
  // All RowBlocks passed to this DeltaIterator must have this same schema.
  //
  // The snapshot in 'opts' is the MVCC state which determines which ops should
  // be considered committed (and thus applied by the iterator).
  //
  // Returns Status::OK and sets 'iterator' to the new DeltaIterator, or
  // returns Status::NotFound if the mutations within this delta store
  // cannot include the snapshot.
  virtual Status NewDeltaIterator(const RowIteratorOptions& opts,
                                  std::unique_ptr<DeltaIterator>* iterator) const = 0;

  // Set *deleted to true if the latest update for the given row is a deletion.
  virtual Status CheckRowDeleted(rowid_t row_idx, const fs::IOContext* io_context,
                                 bool *deleted) const = 0;

  // Get the store's estimated size in bytes.
  virtual uint64_t EstimateSize() const = 0;

  virtual std::string ToString() const = 0;

  // TODO(jdcryans): remove this once we don't need to have delta_stats for
  // both DMS and DFR. Currently DeltaTracker#GetColumnsIdxWithUpdates() needs
  // to filter out DMS from the redo list but it can't without RTTI.
  virtual const DeltaStats& delta_stats() const = 0;

  // Returns whether callers can use 'delta_stats()', either because they've
  // been read from disk, or because the store has been initialized with the
  // stats cached.
  virtual bool has_delta_stats() const  {
    return Initted();
  }

  virtual ~DeltaStore() {}
};

typedef std::vector<std::shared_ptr<DeltaStore>> SharedDeltaStoreVector;

// Iterator over deltas.
// For each rowset, this iterator is constructed alongside the base data iterator,
// and used to apply any updates which haven't been yet compacted into the base
// (i.e. those edits in the DeltaMemStore or in delta files)
//
// Typically this is used as follows:
//
//   Open iterator, seek to particular point in file
//   RowBlock rowblock;
//   foreach RowBlock in base data {
//     clear row block
//     CHECK_OK(iter->PrepareBatch(rowblock.size()));
//     ... read column 0 from base data into row block ...
//     CHECK_OK(iter->ApplyUpdates(0, rowblock.column(0))
//     ... check predicates for column ...
//     ... read another column from base data...
//     CHECK_OK(iter->ApplyUpdates(1, rowblock.column(1)))
//     ...
//  }

struct DeltaKeyAndUpdate {
  DeltaKey key;
  Slice cell;

  // Stringifies this DeltaKeyAndUpdate, according to 'schema'.
  //
  // If 'pad' is true, pads the delta row ids and txn ids in the output so that we can
  // compare two stringified representations and obtain the same result as comparing the DeltaKey
  // itself. That is, if 'pad' is true, then DeltaKey a < DeltaKey b => Stringify(a) < Stringify(b).
  std::string Stringify(DeltaType type, const Schema& schema, bool pad_key = false) const;
};

// Representation of deltas that have been "prepared" by an iterator. That is,
// they have been consistently read (at a snapshot) from their backing store
// into an in-memory format suitable for efficient retrieval.
class PreparedDeltas {
 public:
  // Applies the snapshotted updates to one of the columns.
  //
  // 'dst' must be the same length as was previously passed to PrepareBatch().
  //
  // Updates belonging to unselected rows in 'filter' will be skipped. This is
  // intended as an optimization; if the caller knows with certainty that some
  // rows are irrelevant (e.g. they've been deleted), we can avoid some copying.
  //
  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
  virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
                              const SelectionVector& filter) = 0;

  // Applies any deletes to the given selection vector.
  //
  // Rows which have been deleted in the associated MVCC snapshot are set to 0
  // in the selection vector so that they don't show up in the output.
  //
  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
  virtual Status ApplyDeletes(SelectionVector* sel_vec) = 0;

  // Modifies the given SelectedDeltas to include rows with relevant deltas from
  // the current prepared batch.
  //
  // Deltas must have been prepared with the flag PREPARE_FOR_SELECT.
  virtual Status SelectDeltas(SelectedDeltas* deltas) = 0;

  // Collects the mutations associated with each row in the current prepared batch.
  //
  // Each entry in the vector will be treated as a singly linked list of Mutation
  // objects. If there are no mutations for that row, the entry will be unmodified.
  // If there are mutations, they will be prepended at the head of the linked list
  // (i.e the resulting list will be in descending timestamp order)
  //
  // The Mutation objects will be allocated out of the provided Arena, which must be non-NULL.
  //
  // Deltas must have been prepared with the flag PREPARE_FOR_COLLECT.
  virtual Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) = 0;

  // Iterates through all deltas, adding deltas for columns not specified in
  // 'col_ids' to 'out'.
  //
  // Unlike CollectMutations, the iterator's MVCC snapshots are ignored; all
  // deltas are considered relevant.
  //
  // The delta objects will be allocated out the provided Arena, which must be non-NULL.
  //
  // Deltas must have been prepared with the flag PREPARE_FOR_COLLECT.
  virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
                                                 std::vector<DeltaKeyAndUpdate>* out,
                                                 Arena* arena) = 0;

  // Returns true if there might exist deltas to be applied. It is safe to
  // conservatively return true, but this would force a skip over decoder-level
  // evaluation.
  //
  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
  virtual bool MayHaveDeltas() const = 0;
};

class DeltaIterator : public PreparedDeltas {
 public:
  // Initialize the iterator. This must be called once before any other
  // call.
  virtual Status Init(ScanSpec *spec) = 0;

  // Seek to a particular ordinal position in the delta data. This cancels any prepared
  // block, and must be called at least once prior to PrepareBatch().
  virtual Status SeekToOrdinal(rowid_t idx) = 0;

  // Prepare to apply deltas to a block of rows. This takes a consistent snapshot
  // of all updates to the next 'nrows' rows, so that subsequent calls to a
  // PreparedDeltas method will not cause any "tearing"/non-atomicity.
  //
  // 'prepare_flags' is a bitfield describing what operation(s) the batch will
  // be used for; some implementations may choose to prepare differently.
  // PREPARE_NONE is an invalid value; it is only used internally.
  //
  // Each time this is called, the iterator is advanced by the full length
  // of the previously prepared block.
  enum {
    // There are no prepared blocks. Attempts to call a PreparedDeltas function
    // will fail.
    PREPARE_NONE = 0,

    // Prepare a batch of deltas for applying. All deltas in the batch will be
    // decoded. Operations affecting row data (i.e. UPDATEs and REINSERTs) will
    // be coalesced into a column-major data structure suitable for
    // ApplyUpdates. Operations affecting row lifecycle (i.e. DELETEs and
    // REINSERTs) will be coalesced into a row-major data structure suitable for ApplyDeletes.
    //
    // On success, ApplyUpdates and ApplyDeltas will be callable.
    PREPARE_FOR_APPLY = 1 << 0,

    // Prepare a batch of deltas for collecting. Deltas will remain encoded and
    // in the order that they were loaded from the backing store.
    //
    // On success, CollectMutations and FilterColumnIdsAndCollectDeltas will be callable.
    PREPARE_FOR_COLLECT = 1 << 1,

    // Prepare a batch of deltas for selecting. All deltas in the batch will be
    // decoded, and a data structure describing which rows had deltas will be
    // populated.
    //
    // On success, SelectUpdates will be callable.
    PREPARE_FOR_SELECT = 1 << 2
  };
  virtual Status PrepareBatch(size_t nrows, int prepare_flags) = 0;

  // Returns true if there are any more rows left in this iterator.
  virtual bool HasNext() const = 0;

  // Return a string representation suitable for debug printouts.
  virtual std::string ToString() const = 0;

  // Returns the number of deltas that have been selected so far. For iterators
  // that iterate across multiple DeltaIterators, this value is useful in
  // defining a total ordering for deltas across all sub-iterators, e.g. by
  // defining the starting point of each sub-iterator's DeltaPreparer's
  // disambiguator.
  //
  // See SelectedDeltas::Delta for more details.
  virtual int64_t deltas_selected() const = 0;

  // Sets the number of deltas selected by the preparer so far, used to define
  // a starting point of a disambiguator between deltas in iterators that
  // iterate across multiple DeltaIterators. Callers should only call this with
  // a monotonically increasing value.
  //
  // See SelectedDeltas::Delta for more details.
  virtual void set_deltas_selected(int64_t deltas_selected) = 0;

  virtual ~DeltaIterator() {}
};

// DeltaPreparer traits suited for a DMSIterator.
struct DMSPreparerTraits {
  static constexpr DeltaType kType = REDO;
  static constexpr bool kAllowReinserts = false;
  static constexpr bool kAllowFilterColumnIdsAndCollectDeltas = false;
  static constexpr bool kInitializeDecodersWithSafetyChecks = false;
};

// DeltaPreparer traits suited for a DeltaFileIterator.
//
// This is just a partial specialization; the DeltaFileIterator is expected to
// dictate the DeltaType.
template<DeltaType Type>
struct DeltaFilePreparerTraits {
  static constexpr DeltaType kType = Type;
  static constexpr bool kAllowReinserts = true;
  static constexpr bool kAllowFilterColumnIdsAndCollectDeltas = true;
  static constexpr bool kInitializeDecodersWithSafetyChecks = true;
};

// Encapsulates all logic and responsibility related to "delta preparation";
// that is, the transformation of encoded deltas into an in-memory
// representation more suitable for efficient service during iteration.
//
// This class is intended to be composed inside a DeltaIterator. The iterator
// is responsible for loading encoded deltas from a backing store, passing them
// to the DeltaPreparer to be transformed, and later, calling the DeltaPreparer
// to serve the deltas.
template<class Traits>
class DeltaPreparer : public PreparedDeltas {
 public:
  explicit DeltaPreparer(RowIteratorOptions opts);

  // Updates internal state to reflect a seek performed by a DeltaIterator.
  //
  // Call upon completion of DeltaIterator::SeekToOrdinal.
  void Seek(rowid_t row_idx);

  // Updates internal state to reflect the beginning of delta batch preparation
  // on the part of a DeltaIterator.
  //
  // Call at the beginning of DeltaIterator::PrepareBatch.
  void Start(size_t nrows, int prepare_flags);

  // Updates internal state to reflect the end of delta batch preparation on the
  // part of a DeltaIterator.
  //
  // Call at the end of DeltaIterator::PrepareBatch.
  void Finish(size_t nrows);

  // Prepares the delta given by 'key' whose encoded changes are pointed to by 'val'.
  //
  // On success, the memory pointed to by 'val' can be destroyed. The
  // 'finished_row' output parameter will be set if we can determine that all
  // future deltas belonging to 'key.row_idx()' are irrelevant under the
  // snapshot provided at preparer construction time; the caller can skip ahead
  // to deltas belonging to the next row.
  //
  // Call when a new delta becomes available in DeltaIterator::PrepareBatch.
  Status AddDelta(const DeltaKey& key, Slice val, bool* finished_row);

  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
                      const SelectionVector& filter) override;

  Status ApplyDeletes(SelectionVector* sel_vec) override;

  Status SelectDeltas(SelectedDeltas* deltas) override;

  Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;

  Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
                                         std::vector<DeltaKeyAndUpdate>* out,
                                         Arena* arena) override;

  bool MayHaveDeltas() const override;

  rowid_t cur_prepared_idx() const { return cur_prepared_idx_; }
  std::optional<rowid_t> last_added_idx() const { return last_added_idx_; }
  const RowIteratorOptions& opts() const { return opts_; }

  int64_t deltas_selected() const {
    return deltas_selected_;
  }

  void set_deltas_selected(int64_t deltas_selected) {
    // NOTE: it's possible that the iterator's number of 'deltas_selected'
    // hasn't changed if no deltas were selected since last being called, hence
    // the DCHECK_GE check instead of DCHECK_GT.
    DCHECK_GE(deltas_selected, deltas_selected_);
    deltas_selected_ = deltas_selected;
  }

 private:
  // If 'decoder' is not yet initialized, initializes it in accordance with the
  // preparer's traits.
  static Status InitDecoderIfNecessary(RowChangeListDecoder* decoder);

  // Checks whether we are done processing a row's deltas. If so, attempts to
  // convert the row's latest deletion state into a saved deletion or
  // reinsertion. By deferring this work to when a row is finished, we avoid
  // creating unnecessary deletions and reinsertions for rows that are
  // repeatedly deleted and reinserted.
  //
  // 'cur_row_idx' may be unset when there is no new row index, such as when
  // called upon completion of an entire batch of deltas (i.e. from Finish()).
  void MaybeProcessPreviousRowChange(std::optional<rowid_t> cur_row_idx);

  // Update the deletion state of the current row being processed based on 'op'.
  void UpdateDeletionState(RowChangeList::ChangeType op);

  // Options with which the DeltaPreparer's iterator was constructed.
  const RowIteratorOptions opts_;

  // The row index at which the most recent batch preparation ended.
  rowid_t cur_prepared_idx_;

  // The value of 'cur_prepared_idx_' from the previous batch.
  rowid_t prev_prepared_idx_;

  // The index of the row last added in AddDelta(), if one exists.
  std::optional<rowid_t> last_added_idx_;

  // Whether there are any prepared blocks.
  int prepared_flags_;

  // State when prepared_flags_ & PREPARED_FOR_APPLY
  // ------------------------------------------------------------
  struct ColumnUpdate {
    rowid_t row_id;
    void* new_val_ptr;
    uint8_t new_val_buf[16];
  };
  typedef std::deque<ColumnUpdate> UpdatesForColumn;
  std::vector<UpdatesForColumn> updates_by_col_;

  // A row whose last relevant mutation was DELETE (or REINSERT).
  //
  // These lists are disjoint; a row that was both deleted and reinserted will
  // not be in either.
  std::vector<rowid_t> deleted_;
  std::vector<rowid_t> reinserted_;

  // The deletion state of the row last processed by AddDelta().
  //
  // As a row's DELETEs and REINSERTs are processed, the deletion state
  // alternates between the values below.
  enum RowDeletionState {
    UNKNOWN,
    DELETED,
    REINSERTED
  };
  RowDeletionState deletion_state_;

  // State when prepared_flags_ & PREPARED_FOR_COLLECT
  // ------------------------------------------------------------
  struct PreparedDelta {
    DeltaKey key;
    Slice val;
  };
  std::vector<PreparedDelta> prepared_deltas_;

  // State when prepared_for_ & PREPARED_FOR_SELECT
  // ------------------------------------------------------------
  SelectedDeltas selected_;

  // The number of deltas selected so far by this DeltaPreparer. Used to build
  // disambiguators (see SelectedDeltas::Delta).
  int64_t deltas_selected_;

  // Used for PREPARED_FOR_APPLY mode.
  //
  // Set to true in all of the spots where deleted_, reinserted_, and updates_by_col_
  // are modified.
  bool may_have_deltas_;

  DISALLOW_COPY_AND_ASSIGN(DeltaPreparer);
};

enum { ITERATE_OVER_ALL_ROWS = 0 };

// Dumps contents of 'iter' to 'out', line-by-line.  Used to unit test
// minor delta compaction.
//
// If 'nrows' is ITERATE_OVER_ALL_ROWS, all rows will be dumped.
Status DebugDumpDeltaIterator(DeltaType type,
                              DeltaIterator* iter,
                              const Schema& schema,
                              size_t nrows,
                              std::vector<std::string>* out);

// Writes the contents of 'iter' to 'out', block by block.  Used by
// minor delta compaction.
//
// If 'nrows' is ITERATE_OVER_ALL_ROWS, all rows will be dumped.
template<DeltaType Type>
Status WriteDeltaIteratorToFile(DeltaIterator* iter,
                                size_t nrows,
                                DeltaFileWriter* out);

} // namespace tablet
} // namespace kudu

