blob: 387977af52b484b220dd0e5cfa11619d2a329143 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_CONSENSUS_LOG_UTIL_H_
#define KUDU_CONSENSUS_LOG_UTIL_H_
#include <gtest/gtest.h>
#include <iosfwd>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
// Used by other classes, now part of the API.
DECLARE_bool(log_force_fsync_all);
namespace kudu {
namespace consensus {
struct OpIdBiggerThanFunctor;
} // namespace consensus
namespace log {
// Suffix for temprorary files
extern const char kTmpSuffix[];
// Each log entry is prefixed by its length (4 bytes), CRC (4 bytes),
// and checksum of the other two fields (see EntryHeader struct below).
extern const size_t kEntryHeaderSize;
extern const int kLogMajorVersion;
extern const int kLogMinorVersion;
class ReadableLogSegment;
// Options for the State Machine/Write Ahead Log
struct LogOptions {
// The size of a Log segment
// Logs will rollover upon reaching this size (default 64 MB)
size_t segment_size_mb;
// Whether to call fsync on every call to Append().
bool force_fsync_all;
// Whether to fallocate segments before writing to them.
bool preallocate_segments;
// Whether the allocation should happen asynchronously.
bool async_preallocate_segments;
LogOptions();
};
// A sequence of segments, ordered by increasing sequence number.
typedef std::vector<scoped_refptr<ReadableLogSegment> > SegmentSequence;
// A segment of the log can either be a ReadableLogSegment (for replay and
// consensus catch-up) or a WritableLogSegment (where the Log actually stores
// state). LogSegments have a maximum size defined in LogOptions (set from the
// log_segment_size_mb flag, which defaults to 64). Upon reaching this size
// segments are rolled over and the Log continues in a new segment.
// A readable log segment for recovery and follower catch-up.
class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
public:
// Factory method to construct a ReadableLogSegment from a file on the FS.
static Status Open(Env* env,
const std::string& path,
scoped_refptr<ReadableLogSegment>* segment);
// Build a readable segment to read entries from the provided path.
ReadableLogSegment(std::string path,
std::shared_ptr<RandomAccessFile> readable_file);
// Initialize the ReadableLogSegment.
// This initializer provides methods for avoiding disk IO when creating a
// ReadableLogSegment for the current WritableLogSegment, i.e. for reading
// the log entries in the same segment that is currently being written to.
Status Init(const LogSegmentHeaderPB& header,
int64_t first_entry_offset);
// Initialize the ReadableLogSegment.
// This initializer provides methods for avoiding disk IO when creating a
// ReadableLogSegment from a WritableLogSegment (i.e. for log rolling).
Status Init(const LogSegmentHeaderPB& header,
const LogSegmentFooterPB& footer,
int64_t first_entry_offset);
// Initialize the ReadableLogSegment.
// This initializer will parse the log segment header and footer.
// Note: This returns Status and may fail.
Status Init();
// Reads all entries of the provided segment & adds them the 'entries' vector.
// The 'entries' vector owns the read entries.
//
// If the log is corrupted (i.e. the returned 'Status' is 'Corruption') all
// the log entries read up to the corrupted one are returned in the 'entries'
// vector.
//
// If 'end_offset' is not NULL, then returns the file offset following the last
// successfully read entry.
Status ReadEntries(std::vector<LogEntryPB*>* entries,
int64_t* end_offset = NULL);
// Rebuilds this segment's footer by scanning its entries.
// This is an expensive operation as it reads and parses the whole segment
// so it should be only used in the case of a crash, where the footer is
// missing because we didn't have the time to write it out.
Status RebuildFooterByScanning();
bool IsInitialized() const {
return is_initialized_;
}
// Returns the parent directory where log segments are stored.
const std::string &path() const {
return path_;
}
const LogSegmentHeaderPB& header() const {
DCHECK(header_.IsInitialized());
return header_;
}
// Indicates whether this segment has a footer.
//
// Segments that were properly closed, e.g. because they were rolled over,
// will have properly written footers. On the other hand if there was a
// crash and the segment was not closed properly the footer will be missing.
// In this case calling ReadEntries() will rebuild the footer.
bool HasFooter() const {
return footer_.IsInitialized();
}
// Returns this log segment's footer.
//
// If HasFooter() returns false this cannot be called.
const LogSegmentFooterPB& footer() const {
DCHECK(IsInitialized());
CHECK(HasFooter());
return footer_;
}
const std::shared_ptr<RandomAccessFile> readable_file() const {
return readable_file_;
}
const int64_t file_size() const {
return file_size_.Load();
}
const int64_t first_entry_offset() const {
return first_entry_offset_;
}
// Returns the full size of the file, if the segment is closed and has
// a footer, or the offset where the last written, non corrupt entry
// ends.
const int64_t readable_up_to() const;
private:
friend class RefCountedThreadSafe<ReadableLogSegment>;
friend class LogReader;
FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
struct EntryHeader {
// The length of the batch data.
uint32_t msg_length;
// The CRC32C of the batch data.
uint32_t msg_crc;
// The CRC32C of this EntryHeader.
uint32_t header_crc;
};
~ReadableLogSegment() {}
// Helper functions called by Init().
Status ReadFileSize();
Status ReadHeader();
Status ReadHeaderMagicAndHeaderLength(uint32_t *len);
Status ParseHeaderMagicAndHeaderLength(const Slice &data, uint32_t *parsed_len);
Status ReadFooter();
Status ReadFooterMagicAndFooterLength(uint32_t *len);
Status ParseFooterMagicAndFooterLength(const Slice &data, uint32_t *parsed_len);
// Starting at 'offset', read the rest of the log file, looking for any
// valid log entry headers. If any are found, sets *has_valid_entries to true.
//
// Returns a bad Status only in the case that some IO error occurred reading the
// file.
Status ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries);
// Format a nice error message to report on a corruption in a log file.
Status MakeCorruptionStatus(int batch_number, int64_t batch_offset,
std::vector<int64_t>* recent_offsets,
const std::vector<LogEntryPB*>& entries,
const Status& status) const;
Status ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf,
gscoped_ptr<LogEntryBatchPB>* batch);
// Reads a log entry header from the segment.
// Also increments the passed offset* by the length of the entry.
Status ReadEntryHeader(int64_t *offset, EntryHeader* header);
// Decode a log entry header from the given slice, which must be kEntryHeaderSize
// bytes long. Returns true if successful, false if corrupt.
//
// NOTE: this is performance-critical since it is used by ScanForValidEntryHeaders
// and thus returns bool instead of Status.
bool DecodeEntryHeader(const Slice& data, EntryHeader* header);
// Reads a log entry batch from the provided readable segment, which gets decoded
// into 'entry_batch' and increments 'offset' by the batch's length.
Status ReadEntryBatch(int64_t *offset,
const EntryHeader& header,
faststring* tmp_buf,
gscoped_ptr<LogEntryBatchPB>* entry_batch);
void UpdateReadableToOffset(int64_t readable_to_offset);
const std::string path_;
// The size of the readable file.
// This is set by Init(). In the case of a log being written to,
// this may be increased by UpdateReadableToOffset()
AtomicInt<int64_t> file_size_;
// The offset up to which we can read the file.
// For already written segments this is fixed and equal to the file size
// but for the segments currently written to this is the offset up to which
// we can read without the fear of reading garbage/zeros.
// This is atomic because the Log thread might be updating the segment's readable
// offset while an async reader is reading the segment's entries.
// is reading it.
AtomicInt<int64_t> readable_to_offset_;
// a readable file for a log segment (used on replay)
const std::shared_ptr<RandomAccessFile> readable_file_;
bool is_initialized_;
LogSegmentHeaderPB header_;
LogSegmentFooterPB footer_;
// True if the footer was rebuilt, rather than actually found on disk.
bool footer_was_rebuilt_;
// the offset of the first entry in the log
int64_t first_entry_offset_;
DISALLOW_COPY_AND_ASSIGN(ReadableLogSegment);
};
// A writable log segment where state data is stored.
class WritableLogSegment {
public:
WritableLogSegment(std::string path,
std::shared_ptr<WritableFile> writable_file);
// Opens the segment by writing the header.
Status WriteHeaderAndOpen(const LogSegmentHeaderPB& new_header);
// Closes the segment by writing the footer and then actually closing the
// underlying WritableFile.
Status WriteFooterAndClose(const LogSegmentFooterPB& footer);
bool IsClosed() {
return IsHeaderWritten() && IsFooterWritten();
}
int64_t Size() const {
return writable_file_->Size();
}
// Appends the provided batch of data, including a header
// and checksum.
// Makes sure that the log segment has not been closed.
Status WriteEntryBatch(const Slice& entry_batch_data);
// Makes sure the I/O buffers in the underlying writable file are flushed.
Status Sync() {
return writable_file_->Sync();
}
// Returns true if the segment header has already been written to disk.
bool IsHeaderWritten() const {
return is_header_written_;
}
const LogSegmentHeaderPB& header() const {
DCHECK(IsHeaderWritten());
return header_;
}
bool IsFooterWritten() const {
return is_footer_written_;
}
const LogSegmentFooterPB& footer() const {
DCHECK(IsFooterWritten());
return footer_;
}
// Returns the parent directory where log segments are stored.
const std::string &path() const {
return path_;
}
const int64_t first_entry_offset() const {
return first_entry_offset_;
}
const int64_t written_offset() const {
return written_offset_;
}
private:
const std::shared_ptr<WritableFile>& writable_file() const {
return writable_file_;
}
// The path to the log file.
const std::string path_;
// The writable file to which this LogSegment will be written.
const std::shared_ptr<WritableFile> writable_file_;
bool is_header_written_;
bool is_footer_written_;
LogSegmentHeaderPB header_;
LogSegmentFooterPB footer_;
// the offset of the first entry in the log
int64_t first_entry_offset_;
// The offset where the last written entry ends.
int64_t written_offset_;
DISALLOW_COPY_AND_ASSIGN(WritableLogSegment);
};
// Sets 'batch' to a newly created batch that contains the pre-allocated
// ReplicateMsgs in 'msgs'.
// We use C-style passing here to avoid having to allocate a vector
// in some hot paths.
void CreateBatchFromAllocatedOperations(const std::vector<consensus::ReplicateRefPtr>& msgs,
gscoped_ptr<LogEntryBatchPB>* batch);
// Checks if 'fname' is a correctly formatted name of log segment file.
bool IsLogFileName(const std::string& fname);
} // namespace log
} // namespace kudu
#endif /* KUDU_CONSENSUS_LOG_UTIL_H_ */