// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include <glog/logging.h>

#include "kudu/cfile/block_encodings.h"
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/block_pointer.h"
#include "kudu/cfile/cfile.pb.h"
#include "kudu/common/iterator_stats.h"
#include "kudu/common/rowid.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/compression/compression.pb.h"
#include "kudu/util/faststring.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/object_pool.h"
#include "kudu/util/once.h"
#include "kudu/util/rle-encoding.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"

namespace kudu {

class ColumnMaterializationContext;
class CompressionCodec;
class EncodedKey;
class SelectionVector;
class TypeInfo;

namespace fs {
struct IOContext;
}  // namespace fs

template <typename T> class ArrayView;

namespace cfile {

class BinaryPlainBlockDecoder;
class CFileIterator;
class IndexTreeIterator;
class TypeEncodingInfo;
struct ReaderOptions;

class CFileReader {
 public:
  // Fully open a cfile using a previously opened block.
  //
  // After this call, the reader is safe for use.
  static Status Open(std::unique_ptr<fs::ReadableBlock> block,
                     ReaderOptions options,
                     std::unique_ptr<CFileReader>* reader);

  // Lazily open a cfile using a previously opened block. A lazy open does
  // not incur additional I/O, nor does it validate the contents of the
  // cfile.
  //
  // Init() must be called before most methods; the exceptions are documented
  // below.
  static Status OpenNoInit(std::unique_ptr<fs::ReadableBlock> block,
                           ReaderOptions options,
                           std::unique_ptr<CFileReader>* reader);

  // Fully opens a previously lazily opened cfile, parsing and validating
  // its contents.
  //
  // May be called multiple times; subsequent calls will no-op.
  Status Init(const fs::IOContext* io_context);

  enum CacheControl {
    CACHE_BLOCK,
    DONT_CACHE_BLOCK
  };

  // Can be called before Init().
  Status NewIterator(std::unique_ptr<CFileIterator>* iter,
                     CacheControl cache_control,
                     const fs::IOContext* io_context);

  // Reads the data block pointed to by `ptr`. Will pull the data block from
  // the block cache if it exists, and reads from the filesystem block
  // otherwise.
  Status ReadBlock(const fs::IOContext* io_context,
                   const BlockPointer& ptr,
                   CacheControl cache_control,
                   scoped_refptr<BlockHandle>* ret) const;

  // Return the number of rows in this cfile.
  // This is assumed to be reasonably fast (i.e does not scan
  // the data)
  Status CountRows(rowid_t* count) const;

  // Retrieve the given metadata entry into 'val'.
  // Returns true if the entry was found, otherwise returns false.
  //
  // Note that this implementation is currently O(n), so should not be used
  // in a hot path.
  bool GetMetadataEntry(const std::string& key, std::string* val) const;

  // Can be called before Init().
  uint64_t file_size() const {
    return file_size_;
  }

  // Can be called before Init().
  const BlockId& block_id() const {
    return block_->id();
  }

  const TypeInfo* type_info() const {
    DCHECK(init_once_.init_succeeded());
    return type_info_;
  }

  const TypeEncodingInfo* type_encoding_info() const {
    DCHECK(init_once_.init_succeeded());
    return type_encoding_info_;
  }

  bool is_nullable() const {
    return footer().is_type_nullable();
  }

  const CFileHeaderPB& header() const {
    DCHECK(init_once_.init_succeeded());
    return *DCHECK_NOTNULL(header_.get());
  }

  const CFileFooterPB& footer() const {
    DCHECK(init_once_.init_succeeded());
    return *DCHECK_NOTNULL(footer_.get());
  }

  bool is_compressed() const {
    return footer().compression() != NO_COMPRESSION;
  }

  // Advanced access to the cfile. This is used by the
  // delta reader code. TODO: think about reorganizing this:
  // delta files can probably be done more cleanly.

  // Return true if there is a position-based index on this file.
  bool has_posidx() const { return footer().has_posidx_info(); }
  BlockPointer posidx_root() const {
    DCHECK(has_posidx());
    return BlockPointer(footer().posidx_info().root_block());
  }

  // Return true if there is a value-based index on this file.
  bool has_validx() const { return footer().has_validx_info(); }
  BlockPointer validx_root() const {
    DCHECK(has_validx());
    return BlockPointer(footer().validx_info().root_block());
  }

  // Can be called before Init().
  std::string ToString() const { return block_->id().ToString(); }

 private:
  DISALLOW_COPY_AND_ASSIGN(CFileReader);

  static Status VerifyChecksum(ArrayView<const Slice> data,
                               const Slice& checksum);

  CFileReader(ReaderOptions options,
              uint64_t file_size,
              std::unique_ptr<fs::ReadableBlock> block);

  // Callback used in 'init_once_' to initialize this cfile.
  Status InitOnce(const fs::IOContext* io_context);

  Status ReadAndParseHeader();
  Status ReadAndParseFooter();

  // Return true if the file has checksum on the header, footer, and data blocks.
  bool has_checksum() const;

  // Return true if has_checksum() returns true and the checksum verification
  // is requested.
  bool do_verify_checksum() const;

  // Returns the memory usage of the object including the object itself.
  size_t memory_footprint() const;

  // Handles a corruption error. Functions that may return due to a CFile
  // corruption should call this method before returning.
  void HandleCorruption(const fs::IOContext* io_context) const;

  const std::unique_ptr<fs::ReadableBlock> block_;
  const uint64_t file_size_;

  uint8_t cfile_version_;

  std::unique_ptr<CFileHeaderPB> header_;
  std::unique_ptr<CFileFooterPB> footer_;
  const CompressionCodec* codec_;
  const TypeInfo* type_info_;
  const TypeEncodingInfo* type_encoding_info_;

  bool do_verify_checksum_;

  KuduOnceLambda init_once_;

  ScopedTrackedConsumption mem_consumption_;
};

// Column Iterator interface used by the CFileSet.
// Implemented by the CFileIterator, DefaultColumnValueIterator
// and the ColumnValueTypeAdaptorIterator.
// It is used to fill the data requested by the projection.
class ColumnIterator {
 public:
  virtual ~ColumnIterator() {}

  // Seek to the given ordinal entry in the file.
  // Entry 0 is the first entry written to the file.
  // If provided seek point is past the end of the file,
  // then returns a NotFound Status.
  // TODO: do we ever want to be able to seek to the end of the file?
  virtual Status SeekToOrdinal(rowid_t ord_idx) = 0;

  // Return true if this reader is currently seeked.
  // If the iterator is not seeked, it is an error to call any functions except
  // for seek (including GetCurrentOrdinal).
  virtual bool seeked() const = 0;

  // Get the ordinal index that the iterator is currently pointed to.
  //
  // Prior to calling PrepareBatch(), this returns the position after the last
  // seek. PrepareBatch() and Scan() do not change the position returned by this
  // function. FinishBatch() advances the ordinal to the position of the next
  // block to be prepared.
  virtual rowid_t GetCurrentOrdinal() const = 0;

  // Prepare to read up to *n into the given column block.
  // On return sets *n to the number of prepared rows, which is always
  // <= the requested value.
  //
  // This assumes that dst->size() >= *n on input.
  //
  // If there are at least dst->size() values remaining in the underlying file,
  // this will always return *n == dst->size(). In other words, this does not
  // ever result in a "short read".
  virtual Status PrepareBatch(size_t* n) = 0;

  // Copy values into the prepared column block.
  // Any indirected values (eg strings) are copied into the ctx's block's
  // arena.
  // This does _not_ advance the position in the underlying file. Multiple
  // calls to Scan() will re-read the same values.
  // If decoder eval is supported and allowed, will additionally evaluate the
  // column predicate.
  virtual Status Scan(ColumnMaterializationContext* ctx) = 0;

  // Finish processing the current batch, advancing the iterators
  // such that the next call to PrepareBatch() will start where the previous
  // batch left off.
  virtual Status FinishBatch() = 0;

  virtual const IteratorStats& io_statistics() const = 0;
};

// ColumnIterator that fills the ColumnBlock with the specified value.
// It is used by the CFileSet to handle the case of a column present
// in the projection schema but not in the base data.
//
// Example:
//    DefaultColumnValueIterator iter;
//    iter.Scan(&column_block);
class DefaultColumnValueIterator final : public ColumnIterator {
 public:
  DefaultColumnValueIterator(const TypeInfo* typeinfo, const void* value)
      : typeinfo_(typeinfo),
        value_(value),
        ordinal_(0) {
  }

  Status SeekToOrdinal(rowid_t ord_idx) override;

  bool seeked() const override { return true; }

  rowid_t GetCurrentOrdinal() const override { return ordinal_; }

  Status PrepareBatch(size_t* n) override;
  Status Scan(ColumnMaterializationContext* ctx) override;
  Status FinishBatch() override;

  const IteratorStats& io_statistics() const override { return io_stats_; }

 private:
  const TypeInfo* typeinfo_;
  const void* value_;

  size_t batch_;
  rowid_t ordinal_;
  IteratorStats io_stats_;
};


class CFileIterator final : public ColumnIterator {
 public:
  CFileIterator(CFileReader* reader,
                CFileReader::CacheControl cache_control,
                const fs::IOContext* io_context);
  ~CFileIterator();

  // Seek to the first entry in the file. This works for both
  // ordinal-indexed and value-indexed files.
  Status SeekToFirst();

  // Seek to the given ordinal entry in the file.
  // Entry 0 is the first entry written to the file.
  // If provided seek point is past the end of the file,
  // then returns a NotFound Status.
  // TODO: do we ever want to be able to seek to the end of the file?
  Status SeekToOrdinal(rowid_t ord_idx) override;

  // Seek the index to the given row_key, or to the index entry immediately
  // before it. Then (if the index is sparse) seek the data block to the
  // value matching value or to the value immediately after it.
  //
  // Sets *exact_match to indicate whether the seek found the exact
  // key requested.
  //
  // If this iterator was constructed without no value index,
  // then this will return a NotSupported status.
  Status SeekAtOrAfter(const EncodedKey& encoded_key,
                       bool* exact_match);

  // Return true if this reader is currently seeked.
  // If the iterator is not seeked, it is an error to call any functions except
  // for seek (including GetCurrentOrdinal).
  bool seeked() const override { return seeked_; }

  // Get the ordinal index that the iterator is currently pointed to.
  //
  // Prior to calling PrepareBatch(), this returns the position after the last
  // seek. PrepareBatch() and Scan() do not change the position returned by this
  // function. FinishBatch() advances the ordinal to the position of the next
  // block to be prepared.
  rowid_t GetCurrentOrdinal() const override;

  // Prepare to read up to *n into the given column block.
  // On return sets *n to the number of prepared rows, which is always
  // <= the requested value.
  //
  // This assumes that dst->size() >= *n on input.
  //
  // If there are at least dst->size() values remaining in the underlying file,
  // this will always return *n == dst->size(). In other words, this does not
  // ever result in a "short read".
  Status PrepareBatch(size_t* n) override;

  // Copy values into the prepared column block.
  // Any indirected values (eg strings) are copied into the dst block's
  // arena.
  // This does _not_ advance the position in the underlying file. Multiple
  // calls to Scan() will re-read the same values.
  Status Scan(ColumnMaterializationContext* ctx) override;

  // Finish processing the current batch, advancing the iterators
  // such that the next call to PrepareBatch() will start where the previous
  // batch left off.
  Status FinishBatch() override;

  // Return true if the next call to PrepareBatch will return at least one row.
  bool HasNext() const;

  // Convenience method to prepare a batch, scan it, and finish it.
  Status CopyNextValues(size_t* n, ColumnMaterializationContext* ctx);

  const IteratorStats& io_statistics() const override {
    return io_stats_;
  }

  // If the column is dictionary-coded, returns the decoder
  // for the cfile's dictionary block. This is called by the
  // BinaryDictBlockDecoder.
  BinaryPlainBlockDecoder* GetDictDecoder() { return dict_decoder_.get(); }

  // If the column is dictionary-coded and a predicate on the column exists,
  // returns the set of codewords that pass the predicate. Since a vocabulary
  // is shared among the multiple BinaryDictBlockDecoders in a single cfile,
  // the reader must expose an interface for all decoders to access the
  // single set of predicate-satisfying codewords.
  SelectionVector* GetCodeWordsMatchingPredicate() {
    return codewords_matching_pred_.get();
  }

 private:
  DISALLOW_COPY_AND_ASSIGN(CFileIterator);

  struct PreparedBlock {
    BlockPointer dblk_ptr_;
    scoped_refptr<BlockHandle> dblk_handle_;
    std::unique_ptr<BlockDecoder> dblk_;

    // The rowid of the first row in this block.
    rowid_t first_row_idx() const {
      return dblk_->GetFirstRowId();
    }

    // The index of the seeked position, relative to the start of the block.
    // In case of null bitmap present, dblk_->GetCurrentIndex() is not aligned
    // with the row number, since null values are not written to the data block.
    // check CFileIterator::SeekToPositionInBlock()
    uint32_t idx_in_block_;

    // When the block is first read, it is seeked to the proper position
    // and rewind_idx_ is set to that offset in the block. needs_rewind_
    // is initially false, but after any values are read from the block,
    // it becomes true. This indicates that dblk_ is pointed at a later
    // position in the block, and should be rewound if a second call to
    // Scan() is made.
    // rewind_idx is relative to the first entry in the block (i.e. not a rowid)
    bool needs_rewind_;
    uint32_t rewind_idx_;

    // Total number of rows in the block (nulls + not nulls)
    uint32_t num_rows_in_block_;

    // Null bitmap and bitmap (RLE) decoder
    Slice rle_bitmap;
    RleDecoder<bool> rle_decoder_;

    rowid_t last_row_idx() const {
      return first_row_idx() + num_rows_in_block_ - 1;
    }

    std::string ToString() const;
  };

  // Seek the given PreparedBlock to the given index within it.
  void SeekToPositionInBlock(PreparedBlock* pb, uint32_t idx_in_block);

  // Read the data block currently pointed to by idx_iter_
  // into the given PreparedBlock structure.
  //
  // This does not advance the iterator.
  Status ReadCurrentDataBlock(const IndexTreeIterator& idx_iter,
                              PreparedBlock* prep_block);

  // Read the data block currently pointed to by idx_iter_, and enqueue
  // it onto the end of the prepared_blocks_ deque.
  Status QueueCurrentDataBlock(const IndexTreeIterator& idx_iter);

  // Fully initialize the underlying cfile reader if needed, and clear any
  // seek-related state.
  Status PrepareForNewSeek();

  CFileReader* reader_;

  std::unique_ptr<IndexTreeIterator> posidx_iter_;
  std::unique_ptr<IndexTreeIterator> validx_iter_;

  // Decoder for the dictionary block.
  std::unique_ptr<BinaryPlainBlockDecoder> dict_decoder_;
  scoped_refptr<BlockHandle> dict_block_handle_;

  // Set containing the codewords that match the predicate in a dictionary.
  std::unique_ptr<SelectionVector> codewords_matching_pred_;

  // The currently in-use index iterator. This is equal to either
  // posidx_iter_.get(), validx_iter_.get(), or NULL if not seeked.
  IndexTreeIterator* seeked_;

  // Data blocks that contain data relevant to the currently Prepared
  // batch of rows.
  // These pointers are allocated from the prepared_block_pool_ below.
  std::vector<PreparedBlock*> prepared_blocks_;

  ObjectPool<PreparedBlock> prepared_block_pool_;
  typedef ObjectPool<PreparedBlock>::scoped_ptr pblock_pool_scoped_ptr;

  // True if PrepareBatch() has been called more recently than FinishBatch().
  bool prepared_;

  // Whether this iterator will ask the cfile to cache the blocks it requests or not.
  const CFileReader::CacheControl cache_control_;

  // RowID of the current prepared batch, if prepared_ is true.
  // Otherwise, the RowID of the next batch that will be prepared.
  rowid_t last_prepare_idx_;

  // Number of rows in the current batch, if prepared_ is true.
  // Otherwise, 0.
  uint32_t last_prepare_count_;

  IteratorStats io_stats_;

  const fs::IOContext* io_context_;

  // a temporary buffer for encoding
  faststring tmp_buf_;
};

} // namespace cfile
} // namespace kudu
