// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// A DiskRowSet is a horizontal slice of a Kudu tablet.
// Each DiskRowSet contains data for a a disjoint set of keys.
// See src/kudu/tablet/README for a detailed description.
#pragma once

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <glog/logging.h>
#include <gtest/gtest_prod.h>

#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_tracker.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/rowset_metadata.h"
#include "kudu/tablet/tablet_mem_trackers.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/util/bloom_filter.h"
#include "kudu/util/faststring.h"
#include "kudu/util/locks.h"
#include "kudu/util/make_shared.h"
#include "kudu/util/status.h"

namespace kudu {

class MonoTime;
class RowBlock;
class RowChangeList;
class RowwiseIterator;
class Timestamp;

namespace cfile {
class BloomFileWriter;
class CFileWriter;
}

namespace consensus {
class OpId;
}

namespace fs {
class BlockCreationTransaction;
struct IOContext;
}

namespace log {
class LogAnchorRegistry;
}

namespace tablet {

class CFileSet;
class CompactionInput;
class DeltaFileWriter;
class DeltaStats;
class HistoryGcOpts;
class MultiColumnWriter;
class Mutation;
class MvccSnapshot;
class OperationResultPB;

class DiskRowSetWriter {
 public:
  // TODO: document ownership of rowset_metadata
  DiskRowSetWriter(RowSetMetadata* rowset_metadata, const Schema* schema,
                   BloomFilterSizing bloom_sizing);

  ~DiskRowSetWriter();

  Status Open();

  // The block is written to all column writers as well as the bloom filter,
  // if configured.
  // Rows must be appended in ascending order.
  // 'live_row_count' means the number of live rows in this input block.
  Status AppendBlock(const RowBlock &block, int live_row_count = 0);

  // Closes the CFiles and their underlying writable blocks.
  // If no rows were written, returns Status::Aborted().
  Status Finish();

  // Closes the CFiles, finalizing the underlying blocks and releasing
  // them to 'transaction'. If no rows were written, returns Status::Aborted().
  Status FinishAndReleaseBlocks(fs::BlockCreationTransaction* transaction);

  // The base DiskRowSetWriter never rolls. This method is necessary for tests
  // which are templatized on the writer type.
  Status RollIfNecessary() { return Status::OK(); }

  rowid_t written_count() const {
    CHECK(finished_);
    return written_count_;
  }

  // Return the total number of bytes written so far to this DiskRowSet.
  // Additional bytes may be written by "Finish()", but this should provide
  // a reasonable estimate for the total data size.
  size_t written_size() const;

  const Schema& schema() const { return *schema_; }

 private:
  DISALLOW_COPY_AND_ASSIGN(DiskRowSetWriter);

  Status InitBloomFileWriter();

  // Initializes the index writer required for compound keys
  // this index is written to a new file instead of embedded in the col_* files
  Status InitAdHocIndexWriter();

  // Return the cfile::Writer responsible for writing the key index.
  // (the ad-hoc writer for composite keys, otherwise the key column writer)
  cfile::CFileWriter *key_index_writer();

  RowSetMetadata* rowset_metadata_;
  const Schema* const schema_;

  BloomFilterSizing bloom_sizing_;

  bool finished_;
  rowid_t written_count_;
  std::unique_ptr<MultiColumnWriter> col_writer_;
  std::unique_ptr<cfile::BloomFileWriter> bloom_writer_;
  std::unique_ptr<cfile::CFileWriter> ad_hoc_index_writer_;

  // The last encoded key written.
  faststring last_encoded_key_;
};


// Wrapper around DiskRowSetWriter which "rolls" to a new DiskRowSet after
// a certain amount of data has been written. Each output rowset is suffixed
// with ".N" where N starts at 0 and increases as new rowsets are generated.
//
// See AppendBlock(...) for important usage information.
class RollingDiskRowSetWriter {
 public:
  // Create a new rolling writer. The given 'tablet_metadata' must stay valid
  // for the lifetime of this writer, and is used to construct the new rowsets
  // that this RollingDiskRowSetWriter creates.
  RollingDiskRowSetWriter(TabletMetadata* tablet_metadata, const Schema& schema,
                          BloomFilterSizing bloom_sizing,
                          size_t target_rowset_size);
  ~RollingDiskRowSetWriter();

  Status Open();

  // The block is written to all column writers as well as the bloom filter,
  // if configured.
  // Rows must be appended in ascending order.
  //
  // NOTE: data must be appended in a particular order: for each set of rows
  // you must append deltas using the APIs below *before* appending the block
  // of rows that they correspond to. This ensures that the output delta files
  // and data files are aligned.
  // 'live_row_count' means the number of live rows in this input block.
  Status AppendBlock(const RowBlock &block, int live_row_count = 0);

  // Appends a sequence of REDO deltas for the same row to the current redo
  // delta file. 'row_idx_in_block' is the positional index after the last
  // written block. The 'row_idx_in_drs' out parameter will be set with the row
  // index from the start of the DiskRowSet currently being written.
  Status AppendRedoDeltas(rowid_t row_idx_in_block,
                          Mutation* redo_delta_head,
                          rowid_t* row_idx_in_drs);

  // Appends a sequence of UNDO deltas for the same row to the current undo
  // delta file. 'row_idx_in_block' is the positional index after the last
  // written block. The 'row_idx_in_drs' out parameter will be set with the row
  // index from the start of the DiskRowSet currently being written.
  Status AppendUndoDeltas(rowid_t row_idx_in_block,
                          Mutation* undo_delta_head,
                          rowid_t* row_idx_in_drs);

  // Try to roll the output, if we've passed the configured threshold. This will
  // only roll if called immediately after an AppendBlock() call. The implementation
  // of AppendBlock() doesn't call it automatically, because it doesn't know if there
  // is any more data to be appended. It is safe to call this in other circumstances --
  // it will be ignored if it is not a good time to roll.
  Status RollIfNecessary();

  Status Finish();

  int64_t rows_written_count() const { return written_count_; }

  const Schema &schema() const { return schema_; }

  // Return the set of rowset paths that were written by this writer.
  // This must only be called after Finish() returns an OK result.
  void GetWrittenRowSetMetadata(RowSetMetadataVector* metas) const;

  uint64_t written_size() const { return written_size_; }

  int64_t drs_written_count() const { return written_drs_metas_.size(); }

 private:
  Status RollWriter();

  // Close the current DRS and delta writers, releasing their finished blocks
  // into block_closer_.
  Status FinishCurrentWriter();

  template<DeltaType Type>
  Status AppendDeltas(rowid_t row_idx_in_block,
                      Mutation* delta_head,
                      rowid_t* row_idx,
                      DeltaFileWriter* writer,
                      DeltaStats* delta_stats);

  enum State {
    kInitialized,
    kStarted,
    kFinished
  };
  State state_;

  TabletMetadata* tablet_metadata_;
  const Schema schema_;
  std::shared_ptr<RowSetMetadata> cur_drs_metadata_;
  const BloomFilterSizing bloom_sizing_;
  const size_t target_rowset_size_;

  std::unique_ptr<DiskRowSetWriter> cur_writer_;

  // A delta writer to store the undos for each DRS
  std::unique_ptr<DeltaFileWriter> cur_undo_writer_;
  std::unique_ptr<DeltaStats> cur_undo_delta_stats_;
  // a delta writer to store the redos for each DRS
  std::unique_ptr<DeltaFileWriter> cur_redo_writer_;
  std::unique_ptr<DeltaStats> cur_redo_delta_stats_;
  BlockId cur_undo_ds_block_id_;
  BlockId cur_redo_ds_block_id_;

  uint64_t row_idx_in_cur_drs_;

  // True when we are allowed to roll. We can only roll when the delta writers
  // and data writers are aligned (i.e. just after we've appended a new block of data).
  bool can_roll_;

  // RowSetMetadata objects for diskrowsets which have been successfully
  // written out.
  RowSetMetadataVector written_drs_metas_;

  int64_t written_count_;
  uint64_t written_size_;

  // Syncs and commits all writes of outstanding blocks when the rolling
  // writer is destroyed.
  std::unique_ptr<fs::BlockCreationTransaction> block_transaction_;

  DISALLOW_COPY_AND_ASSIGN(RollingDiskRowSetWriter);
};

// A rowset's disk-space-occupying components are as follows:
// - cfile set
//   - base data
//   - bloom file
//   - ad hoc index
// - delta files
//   - UNDO deltas
//   - REDO deltas
// This struct is a container for the sizes of these components.
struct DiskRowSetSpace {
  uint64_t base_data_size;
  uint64_t bloom_size;
  uint64_t ad_hoc_index_size;
  uint64_t redo_deltas_size;
  uint64_t undo_deltas_size;

  // Helper method to compute the size of the diskrowset's underlying cfile set.
  uint64_t CFileSetOnDiskSize() {
    return base_data_size + bloom_size + ad_hoc_index_size;
  }
};

////////////////////////////////////////////////////////////
// DiskRowSet
////////////////////////////////////////////////////////////

class MajorDeltaCompaction;

class DiskRowSet :
    public RowSet,
    public enable_make_shared<DiskRowSet> {
 public:
  static const char *kMinKeyMetaEntryName;
  static const char *kMaxKeyMetaEntryName;

  // Open a rowset from disk.
  // If successful, sets *rowset to the newly open rowset
  static Status Open(const std::shared_ptr<RowSetMetadata>& rowset_metadata,
                     log::LogAnchorRegistry* log_anchor_registry,
                     const TabletMemTrackers& mem_trackers,
                     const fs::IOContext* io_context,
                     std::shared_ptr<DiskRowSet> *rowset);

  ////////////////////////////////////////////////////////////
  // "Management" functions
  ////////////////////////////////////////////////////////////

  // Flush all accumulated delta data to disk.
  Status FlushDeltas(const fs::IOContext* io_context) override;

  // Perform delta store minor compaction.
  // This compacts the delta files down to a single one.
  // If there is already only a single delta file, this does nothing.
  Status MinorCompactDeltaStores(const fs::IOContext* io_context) override;

  ////////////////////////////////////////////////////////////
  // RowSet implementation
  ////////////////////////////////////////////////////////////

  ////////////////////
  // Updates
  ////////////////////

  // Update the given row.
  // 'key' should be the key portion of the row -- i.e a contiguous
  // encoding of the key columns.
  Status MutateRow(Timestamp timestamp,
                   const RowSetKeyProbe &probe,
                   const RowChangeList &update,
                   const consensus::OpId& op_id,
                   const fs::IOContext* io_context,
                   ProbeStats* stats,
                   OperationResultPB* result) override;

  Status CheckRowPresent(const RowSetKeyProbe &probe, const fs::IOContext* io_context,
                         bool *present, ProbeStats* stats) const override;

  ////////////////////
  // Read functions.
  ////////////////////
  virtual Status NewRowIterator(const RowIteratorOptions& opts,
                                std::unique_ptr<RowwiseIterator>* out) const override;

  virtual Status NewCompactionInput(const Schema* projection,
                                    const MvccSnapshot &snap,
                                    const fs::IOContext* io_context,
                                    std::unique_ptr<CompactionInput>* out) const override;

  // Gets the number of rows in this rowset, checking 'num_rows_' first. If not
  // yet set, consults the base data and stores the result in 'num_rows_'.
  Status CountRows(const fs::IOContext* io_context, rowid_t *count) const final override;

  // Count the number of live rows in this DRS.
  virtual Status CountLiveRows(uint64_t* count) const override;

  // See RowSet::GetBounds(...)
  virtual Status GetBounds(std::string* min_encoded_key,
                           std::string* max_encoded_key) const override;

  void GetDiskRowSetSpaceUsage(DiskRowSetSpace* drss) const;

  uint64_t OnDiskSize() const override;

  uint64_t OnDiskBaseDataSize() const override;

  uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const override;

  uint64_t OnDiskBaseDataSizeWithRedos() const override;

  size_t DeltaMemStoreSize() const override;

  bool DeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) const override;

  bool DeltaMemStoreEmpty() const override;

  int64_t MinUnflushedLogIndex() const override;

  size_t CountDeltaStores() const;

  double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const override;

  Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
                                                     int64_t* bytes) override;

  Status IsDeletedAndFullyAncient(Timestamp ancient_history_mark,
                                  bool* deleted_and_ancient) override;

  Status InitUndoDeltas(Timestamp ancient_history_mark,
                        MonoTime deadline,
                        const fs::IOContext* io_context,
                        int64_t* delta_blocks_initialized,
                        int64_t* bytes_in_ancient_undos) override;

  Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark, const fs::IOContext* io_context,
                                 int64_t* blocks_deleted, int64_t* bytes_deleted) override;

  // Major compacts all the delta files for all the columns.
  Status MajorCompactDeltaStores(const fs::IOContext* io_context, HistoryGcOpts history_gc_opts);

  std::mutex *compact_flush_lock() override {
    return &compact_flush_lock_;
  }

  bool has_been_compacted() const override {
    return has_been_compacted_.load();
  }

  void set_has_been_compacted() override {
    has_been_compacted_.store(true);
  }

  DeltaTracker *delta_tracker() {
    return DCHECK_NOTNULL(delta_tracker_.get());
  }

  std::shared_ptr<RowSetMetadata> metadata() override {
    return rowset_metadata_;
  }

  std::string ToString() const override {
    return rowset_metadata_->ToString();
  }

  std::string LogPrefix() const {
    return strings::Substitute("T $0 P $1: $2: ",
        rowset_metadata_->tablet_metadata()->tablet_id(),
        rowset_metadata_->fs_manager()->uuid(),
        ToString());
  }

  virtual Status DebugDump(std::vector<std::string> *lines) override;

 protected:
  DiskRowSet(std::shared_ptr<RowSetMetadata> rowset_metadata,
             log::LogAnchorRegistry* log_anchor_registry,
             TabletMemTrackers mem_trackers);

 private:
  FRIEND_TEST(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns);
  FRIEND_TEST(TestCompaction, TestOneToOne);
  FRIEND_TEST(TestRowSet, TestRowSetUpdate);
  FRIEND_TEST(TestRowSet, TestDMSFlush);

  friend class CompactionInput;
  friend class Tablet;

  Status Open(const fs::IOContext* io_context);

  // Create a new major delta compaction object to compact the specified columns.
  Status NewMajorDeltaCompaction(const std::vector<ColumnId>& col_ids,
                                 HistoryGcOpts history_gc_opts,
                                 const fs::IOContext* io_context,
                                 std::unique_ptr<MajorDeltaCompaction>* out) const;

  // Major compacts all the delta files for the specified columns.
  Status MajorCompactDeltaStoresWithColumnIds(const std::vector<ColumnId>& col_ids,
                                              const fs::IOContext* io_context,
                                              HistoryGcOpts history_gc_opts);

  std::shared_ptr<RowSetMetadata> rowset_metadata_;

  bool open_;

  log::LogAnchorRegistry* log_anchor_registry_;

  TabletMemTrackers mem_trackers_;

  // Base data for this rowset.
  mutable rw_spinlock component_lock_;
  std::shared_ptr<CFileSet> base_data_;
  std::unique_ptr<DeltaTracker> delta_tracker_;

  // Number of rows in the rowset. This may be unset (-1) if the rows in the
  // underlying cfile set have not been counted yet.
  mutable std::atomic<int64_t> num_rows_;

  // Lock governing this rowset's inclusion in a compact/flush. If locked,
  // no other compactor will attempt to include this rowset.
  std::mutex compact_flush_lock_;

  // Flag indicating whether the rowset has been removed from a rowset tree,
  // and thus should not be scheduled for further compactions.
  std::atomic<bool> has_been_compacted_;

  DISALLOW_COPY_AND_ASSIGN(DiskRowSet);
};

} // namespace tablet
} // namespace kudu
