// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <string>
#include <vector>

#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest_prod.h>

#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"

// Used by other classes, now part of the API.
DECLARE_bool(log_force_fsync_all);

namespace kudu {

class CompressionCodec;

namespace log {

// Each log entry is prefixed by a header. See DecodeEntryHeader()
// implementation for details.
extern const size_t kEntryHeaderSizeV2;

class ReadableLogSegment;

typedef std::vector<std::unique_ptr<LogEntryPB>> LogEntries;

// Options for the State Machine/Write Ahead Log
struct LogOptions {

  // The size of a Log segment
  // Logs will rollover upon reaching this size (default 64 MB)
  size_t segment_size_mb;

  // Whether to call fsync on every call to Append().
  bool force_fsync_all;

  // Whether to fallocate segments before writing to them.
  bool preallocate_segments;

  // Whether the allocation should happen asynchronously.
  bool async_preallocate_segments;

  LogOptions();
};

// A sequence of segments, ordered by increasing sequence number.
typedef std::vector<scoped_refptr<ReadableLogSegment> > SegmentSequence;

// Detailed error codes when decoding entry headers. Used for more fine-grained
// error-handling.
enum class EntryHeaderStatus {
  OK,

  // The entry was just a run of zeros. It's likely we are trying to
  // read from pre-allocated space.
  ALL_ZEROS,

  // The entry checksum didn't match the expected value.
  CRC_MISMATCH,

  // Some other error occurred (eg an IO error while reading)
  OTHER_ERROR
};

// LogEntryReader provides iterator-style access to read the entries
// from an open log segment.
class LogEntryReader {
 public:

  // Construct a LogEntryReader to read from the provided segment.
  // 'seg' must outlive the LogEntryReader.
  explicit LogEntryReader(const ReadableLogSegment* seg);

  ~LogEntryReader();

  // Read the next entry from the log, replacing the contents of 'entry'.
  //
  // When there are no more entries to read, returns Status::EndOfFile().
  Status ReadNextEntry(std::unique_ptr<LogEntryPB>* entry);

  // Return the offset of the next entry to be read from the file.
  int64_t offset() const {
    return offset_;
  }

  // Return the offset at which this reader will stop reading.
  int64_t read_up_to_offset() const {
    return read_up_to_;
  }

 private:
  friend class ReadableLogSegment;

  // Handle an error reading an entry.
  Status HandleReadError(const Status& s, EntryHeaderStatus status_detail) const;

  // Format a nice error message to report on a corruption in a log file.
  Status MakeCorruptionStatus(const Status& status) const;

  // The segment being read.
  const ReadableLogSegment* seg_;

  // The last several entries which were successfully read.
  struct RecentEntry {
    int64_t offset;
    LogEntryTypePB type;
    consensus::OpId op_id;
  };
  std::deque<RecentEntry> recent_entries_;
  static const int kNumRecentEntries = 4;

  // Entries which have been read from the file and not yet returned to
  // the caller.
  std::deque<std::unique_ptr<LogEntryPB>> pending_entries_;

  // The total number of log entry batches read from the file.
  int64_t num_batches_read_;

  // The total number of LogEntryPBs read from the file.
  int64_t num_entries_read_;

  // The offset of the next entry to be read.
  int64_t offset_;

  // The offset at which this reader will stop reading entries.
  int64_t read_up_to_;

  // Temporary buffer used for deserialization.
  faststring tmp_buf_;

  DISALLOW_COPY_AND_ASSIGN(LogEntryReader);
};

// A segment of the log can either be a ReadableLogSegment (for replay and
// consensus catch-up) or a WritableLogSegment (where the Log actually stores
// state). LogSegments have a maximum size defined in LogOptions (set from the
// log_segment_size_mb flag, which defaults to 64). Upon reaching this size
// segments are rolled over and the Log continues in a new segment.

// A readable log segment for recovery and follower catch-up.
//
// Const methods are thread-safe; non-const methods are not. Care must be taken
// not to invoke non-thread-safe methods after the segment has been incorporated
// into the LogReader, as there may be concurrent threads accessing the reader.
class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
 public:
  // Factory method to construct a ReadableLogSegment from a file on the FS.
  static Status Open(Env* env,
                     const std::string& path,
                     scoped_refptr<ReadableLogSegment>* segment);

  // Build a readable segment to read entries from the provided path.
  ReadableLogSegment(std::string path, std::shared_ptr<RWFile> file);

  // Initialize the ReadableLogSegment.
  // This initializer provides methods for avoiding disk IO when creating a
  // ReadableLogSegment for the current WritableLogSegment, i.e. for reading
  // the log entries in the same segment that is currently being written to.
  Status Init(const LogSegmentHeaderPB& header,
              int64_t first_entry_offset);

  // Initialize the ReadableLogSegment.
  // This initializer provides methods for avoiding disk IO when creating a
  // ReadableLogSegment from a WritableLogSegment (i.e. for log rolling).
  Status Init(const LogSegmentHeaderPB& header,
              const LogSegmentFooterPB& footer,
              int64_t first_entry_offset);

  // Initialize the ReadableLogSegment.
  // This initializer will parse the log segment header and footer.
  // Note: This returns Status and may fail.
  Status Init();

  // Reads all entries of the provided segment & adds them the 'entries' vector.
  // The 'entries' vector owns the read entries.
  //
  // If the log is corrupted (i.e. the returned 'Status' is 'Corruption') all
  // the log entries read up to the corrupted one are returned in the 'entries'
  // vector.
  Status ReadEntries(LogEntries* entries) const;

  // Rebuilds this segment's footer by scanning its entries.
  // This is an expensive operation as it reads and parses the whole segment
  // so it should be only used in the case of a crash, where the footer is
  // missing because we didn't have the time to write it out.
  //
  // This function is not thread-safe and should only be called when there's no
  // danger of another thread accessing the segment.
  Status RebuildFooterByScanning();

  bool IsInitialized() const {
    return is_initialized_;
  }

  // Returns the parent directory where log segments are stored.
  const std::string& path() const {
    return path_;
  }

  const LogSegmentHeaderPB& header() const {
    DCHECK(header_.IsInitialized());
    return header_;
  }

  // Indicates whether this segment has a footer.
  //
  // Segments that were properly closed, e.g. because they were rolled over,
  // will have properly written footers. On the other hand if there was a
  // crash and the segment was not closed properly the footer will be missing.
  // In this case calling ReadEntries() will rebuild the footer.
  bool HasFooter() const {
    return footer_.IsInitialized();
  }

  // Returns this log segment's footer.
  //
  // If HasFooter() returns false this cannot be called.
  const LogSegmentFooterPB& footer() const {
    DCHECK(IsInitialized());
    CHECK(HasFooter());
    return footer_;
  }

  std::shared_ptr<RWFile> file() const {
    return file_;
  }

  int64_t file_size() const {
    return file_size_.Load();
  }

  int64_t first_entry_offset() const {
    return first_entry_offset_;
  }

  // Returns the full size of the file, if the segment is closed and has
  // a footer, or the offset where the last written, non corrupt entry
  // ends.
  int64_t readable_up_to() const;

  // Return the expected length of entry headers in this log segment.
  // Versions of Kudu older than 1.3 used a different log entry header format.
  size_t entry_header_size() const;

 private:
  friend class RefCountedThreadSafe<ReadableLogSegment>;
  friend class LogEntryReader;
  friend class LogReader;
  FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);

  struct EntryHeader {
    // The length of the batch data (uncompressed)
    uint32_t msg_length;

    // The compressed length of the entry. If compression is disabled,
    // equal to msg_length.
    uint32_t msg_length_compressed;

    // The CRC32C of the batch data.
    // If compression is enabled, this is the checksum of the compressed data.
    uint32_t msg_crc;

    // The CRC32C of this EntryHeader.
    uint32_t header_crc;
  };

  ~ReadableLogSegment() {}

  // Helper functions called by Init().

  Status ReadFileSize();

  Status InitCompressionCodec();

  // Read the log file magic and header protobuf into 'header_'. Sets 'first_entry_offset_'
  // to indicate the start of the actual log data.
  //
  // Returns Uninitialized() if the file appears to be preallocated but never
  // written.
  Status ReadHeader();

  // Read the magic and header length from the top of the file, returning
  // the header length in 'len'.
  //
  // Returns Uninitialized() if the file appears to be preallocated but never
  // written.
  Status ReadHeaderMagicAndHeaderLength(uint32_t *len) const;

  // Parse the magic and the PB-header length prefix from 'data'.
  // In the case that 'data' is all '\0' bytes, indicating a preallocated
  // but never-written segment, returns Status::Uninitialized().
  Status ParseHeaderMagicAndHeaderLength(const Slice &data, uint32_t *parsed_len) const;

  Status ReadFooter();

  Status ReadFooterMagicAndFooterLength(uint32_t *len) const;

  static Status ParseFooterMagicAndFooterLength(const Slice &data, uint32_t *parsed_len);

  // Starting at 'offset', read the rest of the log file, looking for any
  // valid log entry headers. If any are found, sets *has_valid_entries to true.
  //
  // Returns a bad Status only in the case that some IO error occurred reading the
  // file.
  Status ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries) const;

  // Read an entry header and its associated batch at the given offset.
  // If successful, updates '*offset' to point to the next batch
  // in the file.
  //
  // If unsuccessful, '*offset' is not updated, and *status_detail will be updated
  // to indicate the cause of the error.
  Status ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf,
                                 std::unique_ptr<LogEntryBatchPB>* batch,
                                 EntryHeaderStatus* status_detail) const;

  // Reads a log entry header from the segment.
  //
  // Also increments the passed offset* by the length of the entry on successful
  // read.
  Status ReadEntryHeader(int64_t *offset, EntryHeader* header,
                         EntryHeaderStatus* status_detail) const;

  // Decode a log entry header from the given slice. The header length is
  // determined by 'entry_header_size()'.
  // Returns true if successful, false if corrupt.
  //
  // NOTE: this is performance-critical since it is used by ScanForValidEntryHeaders
  // and thus returns an enum instead of Status.
  EntryHeaderStatus DecodeEntryHeader(const Slice& data, EntryHeader* header) const;

  // Reads a log entry batch from the provided readable segment, which gets decoded
  // into 'entry_batch' and increments 'offset' by the batch's length.
  Status ReadEntryBatch(int64_t* offset,
                        const EntryHeader& header,
                        faststring* tmp_buf,
                        std::unique_ptr<LogEntryBatchPB>* entry_batch) const;

  void UpdateReadableToOffset(int64_t readable_to_offset);

  const std::string path_;

  // The size of the file.
  // This is set by Init(). In the case of a log being written to,
  // this may be increased by UpdateReadableToOffset()
  AtomicInt<int64_t> file_size_;

  // The offset up to which we can read the file.
  // For already written segments this is fixed and equal to the file size
  // but for the segments currently written to this is the offset up to which
  // we can read without the fear of reading garbage/zeros.
  // This is atomic because the Log thread might be updating the segment's readable
  // offset while an async reader is reading the segment's entries.
  AtomicInt<int64_t> readable_to_offset_;

  // File handle for a log segment (used on replay).
  //
  // Despite being read-write, we only ever use its read methods.
  const std::shared_ptr<RWFile> file_;

  // Compression codec used to decompress entries in this file.
  const CompressionCodec* codec_;

  bool is_initialized_;

  LogSegmentHeaderPB header_;

  LogSegmentFooterPB footer_;

  // True if the footer was rebuilt, rather than actually found on disk.
  bool footer_was_rebuilt_;

  // the offset of the first entry in the log
  int64_t first_entry_offset_;

  DISALLOW_COPY_AND_ASSIGN(ReadableLogSegment);
};

// A writable log segment where state data is stored.
//
// This class is not thread-safe.
class WritableLogSegment {
 public:
  WritableLogSegment(std::string path,
                     std::shared_ptr<RWFile> file);

  // Opens the segment by writing the header.
  Status WriteHeader(const LogSegmentHeaderPB& new_header);

  // Finishes the segment by writing the footer.
  Status WriteFooter(const LogSegmentFooterPB& footer);

  // Appends the provided batch of data, including a header
  // and checksum. If 'codec' is not NULL, compresses the batch.
  // Makes sure that the log segment has not been closed.
  // Write a compressed entry to the log.
  Status WriteEntryBatch(const Slice& data, const CompressionCodec* codec);

  // Makes sure the I/O buffers belonging to the underlying file handle are flushed.
  Status Sync() {
    return file_->Sync();
  }

  // Indicate that the segment has not been written for some period of time.
  // In this case, temporary buffers should be freed up.
  void GoIdle();

  // Returns true if the segment header has already been written to disk.
  bool IsHeaderWritten() const {
    return is_header_written_;
  }

  const LogSegmentHeaderPB& header() const {
    DCHECK(IsHeaderWritten());
    return header_;
  }

  bool IsFooterWritten() const {
    return is_footer_written_;
  }

  const LogSegmentFooterPB& footer() const {
    DCHECK(IsFooterWritten());
    return footer_;
  }

  // Returns the parent directory where log segments are stored.
  const std::string& path() const {
    return path_;
  }

  std::shared_ptr<RWFile> file() const {
    return file_;
  }

  int64_t first_entry_offset() const {
    return first_entry_offset_;
  }

  int64_t written_offset() const {
    return written_offset_;
  }

 private:
  FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);

  // The path to the log segment.
  const std::string path_;

  // The file handle belonging to the log segment.
  const std::shared_ptr<RWFile> file_;

  bool is_header_written_;

  bool is_footer_written_;

  LogSegmentHeaderPB header_;

  LogSegmentFooterPB footer_;

  // the offset of the first entry in the log
  int64_t first_entry_offset_;

  // The offset where the last written entry ends.
  int64_t written_offset_;

  // Buffer used for output when compressing.
  faststring compress_buf_;

  DISALLOW_COPY_AND_ASSIGN(WritableLogSegment);
};

// Return a newly created batch that contains the pre-allocated
// ReplicateMsgs in 'msgs'.
std::unique_ptr<LogEntryBatchPB> CreateBatchFromAllocatedOperations(
    const std::vector<consensus::ReplicateRefPtr>& msgs);

// Checks if 'fname' is a correctly formatted name of log segment file.
bool IsLogFileName(const std::string& fname);

// Update 'footer' to reflect the given REPLICATE message 'entry_pb'.
// In particular, updates the min/max seen replicate OpID.
void UpdateFooterForReplicateEntry(
    const LogEntryPB& entry_pb, LogSegmentFooterPB* footer);

}  // namespace log
}  // namespace kudu
