| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #pragma once |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <deque> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest_prod.h> |
| |
| #include "kudu/consensus/log.pb.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/util/atomic.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| |
| // Used by other classes, now part of the API. |
| DECLARE_bool(log_force_fsync_all); |
| |
| namespace kudu { |
| |
| class CompressionCodec; |
| class FileCache; |
| |
| namespace log { |
| |
| // Each log entry is prefixed by a header. See DecodeEntryHeader() |
| // implementation for details. |
| extern const size_t kEntryHeaderSizeV2; |
| |
| class ReadableLogSegment; |
| |
| typedef std::vector<std::unique_ptr<LogEntryPB>> LogEntries; |
| |
| // Options for the State Machine/Write Ahead Log |
| struct LogOptions { |
| |
| // The size of a Log segment |
| // Logs will rollover upon reaching this size (default 64 MB) |
| size_t segment_size_mb; |
| |
| // Whether to call fsync on every call to Append(). |
| bool force_fsync_all; |
| |
| // Whether to fallocate segments before writing to them. |
| bool preallocate_segments; |
| |
| // Whether the allocation should happen asynchronously. |
| bool async_preallocate_segments; |
| |
| LogOptions(); |
| }; |
| |
| // A sequence of segments, ordered by increasing sequence number. |
| typedef std::vector<scoped_refptr<ReadableLogSegment> > SegmentSequence; |
| |
| // Detailed error codes when decoding entry headers. Used for more fine-grained |
| // error-handling. |
| enum class EntryHeaderStatus { |
| OK, |
| |
| // The entry was just a run of zeros. It's likely we are trying to |
| // read from pre-allocated space. |
| ALL_ZEROS, |
| |
| // The entry checksum didn't match the expected value. |
| CRC_MISMATCH, |
| |
| // Some other error occurred (eg an IO error while reading) |
| OTHER_ERROR |
| }; |
| |
| // LogEntryReader provides iterator-style access to read the entries |
| // from an open log segment. |
| class LogEntryReader { |
| public: |
| |
| // Construct a LogEntryReader to read from the provided segment. |
| // 'seg' must outlive the LogEntryReader. |
| explicit LogEntryReader(const ReadableLogSegment* seg); |
| |
| ~LogEntryReader(); |
| |
| // Read the next entry from the log, replacing the contents of 'entry'. |
| // |
| // When there are no more entries to read, returns Status::EndOfFile(). |
| Status ReadNextEntry(std::unique_ptr<LogEntryPB>* entry); |
| |
| // Return the offset of the next entry to be read from the file. |
| int64_t offset() const { |
| return offset_; |
| } |
| |
| // Return the offset at which this reader will stop reading. |
| int64_t read_up_to_offset() const { |
| return read_up_to_; |
| } |
| |
| private: |
| friend class ReadableLogSegment; |
| |
| // Handle an error reading an entry. |
| Status HandleReadError(const Status& s, EntryHeaderStatus status_detail) const; |
| |
| // Format a nice error message to report on a corruption in a log file. |
| Status MakeCorruptionStatus(const Status& status) const; |
| |
| // The segment being read. |
| const ReadableLogSegment* seg_; |
| |
| // The last several entries which were successfully read. |
| struct RecentEntry { |
| int64_t offset; |
| LogEntryTypePB type; |
| consensus::OpId op_id; |
| }; |
| std::deque<RecentEntry> recent_entries_; |
| static const int kNumRecentEntries = 4; |
| |
| // Entries which have been read from the file and not yet returned to |
| // the caller. |
| std::deque<std::unique_ptr<LogEntryPB>> pending_entries_; |
| |
| // The total number of log entry batches read from the file. |
| int64_t num_batches_read_; |
| |
| // The total number of LogEntryPBs read from the file. |
| int64_t num_entries_read_; |
| |
| // The offset of the next entry to be read. |
| int64_t offset_; |
| |
| // The offset at which this reader will stop reading entries. |
| int64_t read_up_to_; |
| |
| // Temporary buffer used for deserialization. |
| faststring tmp_buf_; |
| |
| DISALLOW_COPY_AND_ASSIGN(LogEntryReader); |
| }; |
| |
| // A segment of the log can either be a ReadableLogSegment (for replay and |
| // consensus catch-up) or a WritableLogSegment (where the Log actually stores |
| // state). LogSegments have a maximum size defined in LogOptions (set from the |
| // log_segment_size_mb flag, which defaults to 64). Upon reaching this size |
| // segments are rolled over and the Log continues in a new segment. |
| |
| // A readable log segment for recovery and follower catch-up. |
| // |
| // Const methods are thread-safe; non-const methods are not. Care must be taken |
| // not to invoke non-thread-safe methods after the segment has been incorporated |
| // into the LogReader, as there may be concurrent threads accessing the reader. |
| class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> { |
| public: |
| // Factory method to construct a ReadableLogSegment from a file on the FS. |
| static Status Open(Env* env, |
| FileCache* file_cache, |
| const std::string& path, |
| scoped_refptr<ReadableLogSegment>* segment); |
| |
| // Build a readable segment to read entries from the provided path. |
| ReadableLogSegment(std::string path, std::shared_ptr<RWFile> file); |
| |
| // Initialize the ReadableLogSegment. |
| // This initializer provides methods for avoiding disk IO when creating a |
| // ReadableLogSegment for the current WritableLogSegment, i.e. for reading |
| // the log entries in the same segment that is currently being written to. |
| Status Init(const LogSegmentHeaderPB& header, |
| int64_t first_entry_offset); |
| |
| // Initialize the ReadableLogSegment. |
| // This initializer provides methods for avoiding disk IO when creating a |
| // ReadableLogSegment from a WritableLogSegment (i.e. for log rolling). |
| Status Init(const LogSegmentHeaderPB& header, |
| const LogSegmentFooterPB& footer, |
| int64_t first_entry_offset); |
| |
| // Initialize the ReadableLogSegment. |
| // This initializer will parse the log segment header and footer. |
| // Note: This returns Status and may fail. |
| Status Init(); |
| |
| // Reads all entries of the provided segment & adds them the 'entries' vector. |
| // The 'entries' vector owns the read entries. |
| // |
| // If the log is corrupted (i.e. the returned 'Status' is 'Corruption') all |
| // the log entries read up to the corrupted one are returned in the 'entries' |
| // vector. |
| Status ReadEntries(LogEntries* entries) const; |
| |
| // Rebuilds this segment's footer by scanning its entries. |
| // This is an expensive operation as it reads and parses the whole segment |
| // so it should be only used in the case of a crash, where the footer is |
| // missing because we didn't have the time to write it out. |
| // |
| // This function is not thread-safe and should only be called when there's no |
| // danger of another thread accessing the segment. |
| Status RebuildFooterByScanning(); |
| |
| bool IsInitialized() const { |
| return is_initialized_; |
| } |
| |
| // Returns the parent directory where log segments are stored. |
| const std::string& path() const { |
| return path_; |
| } |
| |
| const LogSegmentHeaderPB& header() const { |
| DCHECK(header_.IsInitialized()); |
| return header_; |
| } |
| |
| // Indicates whether this segment has a footer. |
| // |
| // Segments that were properly closed, e.g. because they were rolled over, |
| // will have properly written footers. On the other hand if there was a |
| // crash and the segment was not closed properly the footer will be missing. |
| // In this case calling ReadEntries() will rebuild the footer. |
| bool HasFooter() const { |
| return footer_.IsInitialized(); |
| } |
| |
| // Returns this log segment's footer. |
| // |
| // If HasFooter() returns false this cannot be called. |
| const LogSegmentFooterPB& footer() const { |
| DCHECK(IsInitialized()); |
| CHECK(HasFooter()); |
| return footer_; |
| } |
| |
| std::shared_ptr<RWFile> file() const { |
| return file_; |
| } |
| |
| int64_t file_size() const { |
| return file_size_.Load(); |
| } |
| |
| int64_t first_entry_offset() const { |
| return first_entry_offset_; |
| } |
| |
| // Returns the full size of the file, if the segment is closed and has |
| // a footer, or the offset where the last written, non corrupt entry |
| // ends. |
| int64_t readable_up_to() const; |
| |
| // Return the expected length of entry headers in this log segment. |
| // Versions of Kudu older than 1.3 used a different log entry header format. |
| size_t entry_header_size() const; |
| |
| private: |
| friend class RefCountedThreadSafe<ReadableLogSegment>; |
| friend class LogEntryReader; |
| friend class LogReader; |
| FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment); |
| |
| struct EntryHeader { |
| // The length of the batch data (uncompressed) |
| uint32_t msg_length; |
| |
| // The compressed length of the entry. If compression is disabled, |
| // equal to msg_length. |
| uint32_t msg_length_compressed; |
| |
| // The CRC32C of the batch data. |
| // If compression is enabled, this is the checksum of the compressed data. |
| uint32_t msg_crc; |
| |
| // The CRC32C of this EntryHeader. |
| uint32_t header_crc; |
| }; |
| |
| ~ReadableLogSegment() {} |
| |
| // Helper functions called by Init(). |
| |
| Status ReadFileSize(); |
| |
| Status InitCompressionCodec(); |
| |
| // Read the log file magic and header protobuf into 'header_'. Sets 'first_entry_offset_' |
| // to indicate the start of the actual log data. |
| // |
| // Returns Uninitialized() if the file appears to be preallocated but never |
| // written. |
| Status ReadHeader(); |
| |
| // Read the magic and header length from the top of the file, returning |
| // the header length in 'len'. |
| // |
| // Returns Uninitialized() if the file appears to be preallocated but never |
| // written. |
| Status ReadHeaderMagicAndHeaderLength(uint32_t *len) const; |
| |
| // Parse the magic and the PB-header length prefix from 'data'. |
| // In the case that 'data' is all '\0' bytes, indicating a preallocated |
| // but never-written segment, returns Status::Uninitialized(). |
| Status ParseHeaderMagicAndHeaderLength(const Slice &data, uint32_t *parsed_len) const; |
| |
| Status ReadFooter(); |
| |
| Status ReadFooterMagicAndFooterLength(uint32_t *len) const; |
| |
| static Status ParseFooterMagicAndFooterLength(const Slice &data, uint32_t *parsed_len); |
| |
| // Starting at 'offset', read the rest of the log file, looking for any |
| // valid log entry headers. If any are found, sets *has_valid_entries to true. |
| // |
| // Returns a bad Status only in the case that some IO error occurred reading the |
| // file. |
| Status ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries) const; |
| |
| // Read an entry header and its associated batch at the given offset. |
| // If successful, updates '*offset' to point to the next batch |
| // in the file. |
| // |
| // If unsuccessful, '*offset' is not updated, and *status_detail will be updated |
| // to indicate the cause of the error. |
| Status ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf, |
| LogEntryBatchPB* batch, |
| EntryHeaderStatus* status_detail) const; |
| |
| // Reads a log entry header from the segment. |
| // |
| // Also increments the passed offset* by the length of the entry on successful |
| // read. |
| Status ReadEntryHeader(int64_t *offset, EntryHeader* header, |
| EntryHeaderStatus* status_detail) const; |
| |
| // Decode a log entry header from the given slice. The header length is |
| // determined by 'entry_header_size()'. |
| // Returns true if successful, false if corrupt. |
| // |
| // NOTE: this is performance-critical since it is used by ScanForValidEntryHeaders |
| // and thus returns an enum instead of Status. |
| EntryHeaderStatus DecodeEntryHeader(const Slice& data, EntryHeader* header) const; |
| |
| // Reads a log entry batch from the provided readable segment, which gets decoded |
| // into 'entry_batch' and increments 'offset' by the batch's length. |
| Status ReadEntryBatch(int64_t* offset, |
| const EntryHeader& header, |
| faststring* tmp_buf, |
| LogEntryBatchPB* entry_batch) const; |
| |
| void UpdateReadableToOffset(int64_t readable_to_offset); |
| |
| const std::string path_; |
| |
| // The size of the file. |
| // This is set by Init(). In the case of a log being written to, |
| // this may be increased by UpdateReadableToOffset() |
| AtomicInt<int64_t> file_size_; |
| |
| // The offset up to which we can read the file. |
| // For already written segments this is fixed and equal to the file size |
| // but for the segments currently written to this is the offset up to which |
| // we can read without the fear of reading garbage/zeros. |
| // This is atomic because the Log thread might be updating the segment's readable |
| // offset while an async reader is reading the segment's entries. |
| AtomicInt<int64_t> readable_to_offset_; |
| |
| // File handle for a log segment (used on replay). |
| // |
| // Despite being read-write, we only ever use its read methods. |
| const std::shared_ptr<RWFile> file_; |
| |
| // Compression codec used to decompress entries in this file. |
| const CompressionCodec* codec_; |
| |
| bool is_initialized_; |
| |
| LogSegmentHeaderPB header_; |
| |
| LogSegmentFooterPB footer_; |
| |
| // True if the footer was rebuilt, rather than actually found on disk. |
| bool footer_was_rebuilt_; |
| |
| // the offset of the first entry in the log |
| int64_t first_entry_offset_; |
| |
| DISALLOW_COPY_AND_ASSIGN(ReadableLogSegment); |
| }; |
| |
| // A writable log segment where state data is stored. |
| // |
| // This class is not thread-safe. |
| class WritableLogSegment { |
| public: |
| WritableLogSegment(std::string path, |
| std::shared_ptr<RWFile> file); |
| |
| // Opens the segment by writing the header. |
| Status WriteHeader(const LogSegmentHeaderPB& new_header); |
| |
| // Finishes the segment by writing the footer. |
| Status WriteFooter(const LogSegmentFooterPB& footer); |
| |
| // Appends the provided batch of data, including a header |
| // and checksum. If 'codec' is not NULL, compresses the batch. |
| // Makes sure that the log segment has not been closed. |
| // Write a compressed entry to the log. |
| Status WriteEntryBatch(const Slice& data, const CompressionCodec* codec); |
| |
| // Makes sure the I/O buffers belonging to the underlying file handle are flushed. |
| Status Sync() { |
| return file_->Sync(); |
| } |
| |
| // Indicate that the segment has not been written for some period of time. |
| // In this case, temporary buffers should be freed up. |
| void GoIdle(); |
| |
| // Returns true if the segment header has already been written to disk. |
| bool IsHeaderWritten() const { |
| return is_header_written_; |
| } |
| |
| const LogSegmentHeaderPB& header() const { |
| DCHECK(IsHeaderWritten()); |
| return header_; |
| } |
| |
| bool IsFooterWritten() const { |
| return is_footer_written_; |
| } |
| |
| const LogSegmentFooterPB& footer() const { |
| DCHECK(IsFooterWritten()); |
| return footer_; |
| } |
| |
| // Returns the parent directory where log segments are stored. |
| const std::string& path() const { |
| return path_; |
| } |
| |
| std::shared_ptr<RWFile> file() const { |
| return file_; |
| } |
| |
| int64_t first_entry_offset() const { |
| return first_entry_offset_; |
| } |
| |
| int64_t written_offset() const { |
| return written_offset_; |
| } |
| |
| private: |
| FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread); |
| |
| // The path to the log segment. |
| const std::string path_; |
| |
| // The file handle belonging to the log segment. |
| const std::shared_ptr<RWFile> file_; |
| |
| bool is_header_written_; |
| |
| bool is_footer_written_; |
| |
| LogSegmentHeaderPB header_; |
| |
| LogSegmentFooterPB footer_; |
| |
| // the offset of the first entry in the log |
| int64_t first_entry_offset_; |
| |
| // The offset where the last written entry ends. |
| int64_t written_offset_; |
| |
| // Buffer used for output when compressing. |
| faststring compress_buf_; |
| |
| DISALLOW_COPY_AND_ASSIGN(WritableLogSegment); |
| }; |
| |
| // Checks if 'fname' is a correctly formatted name of log segment file. |
| bool IsLogFileName(const std::string& fname); |
| |
| // Update 'footer' to reflect a REPLICATE message with the given |
| // op_id. In particular, updates the min/max seen replicate OpID. |
| void UpdateFooterForReplicateEntry(const consensus::OpId& op_id, |
| LogSegmentFooterPB* footer); |
| |
| } // namespace log |
| } // namespace kudu |