| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/consensus/log_util.h" |
| |
| #include <algorithm> |
| #include <iostream> |
| #include <limits> |
| #include <utility> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/ref_counted_replicate.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/strings/util.h" |
| #include "kudu/util/coding-inl.h" |
| #include "kudu/util/coding.h" |
| #include "kudu/util/crc.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/env_util.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/pb_util.h" |
| |
| DEFINE_int32(log_segment_size_mb, 64, |
| "The default segment size for log roll-overs, in MB"); |
| TAG_FLAG(log_segment_size_mb, advanced); |
| |
| DEFINE_bool(log_force_fsync_all, false, |
| "Whether the Log/WAL should explicitly call fsync() after each write."); |
| TAG_FLAG(log_force_fsync_all, stable); |
| |
| DEFINE_bool(log_preallocate_segments, true, |
| "Whether the WAL should preallocate the entire segment before writing to it"); |
| TAG_FLAG(log_preallocate_segments, advanced); |
| |
| DEFINE_bool(log_async_preallocate_segments, true, |
| "Whether the WAL segments preallocation should happen asynchronously"); |
| TAG_FLAG(log_async_preallocate_segments, advanced); |
| |
| namespace kudu { |
| namespace log { |
| |
| using consensus::OpId; |
| using env_util::ReadFully; |
| using std::vector; |
| using std::shared_ptr; |
| using strings::Substitute; |
| using strings::SubstituteAndAppend; |
| |
| const char kTmpSuffix[] = ".tmp"; |
| |
| const char kLogSegmentHeaderMagicString[] = "kudulogf"; |
| |
| // A magic that is written as the very last thing when a segment is closed. |
| // Segments that were not closed (usually the last one being written) will not |
| // have this magic. |
| const char kLogSegmentFooterMagicString[] = "closedls"; |
| |
| // Header is prefixed with the header magic (8 bytes) and the header length (4 bytes). |
| const size_t kLogSegmentHeaderMagicAndHeaderLength = 12; |
| |
| // Footer is suffixed with the footer magic (8 bytes) and the footer length (4 bytes). |
| const size_t kLogSegmentFooterMagicAndFooterLength = 12; |
| |
| const size_t kEntryHeaderSize = 12; |
| |
| const int kLogMajorVersion = 1; |
| const int kLogMinorVersion = 0; |
| |
| // Maximum log segment header/footer size, in bytes (8 MB). |
| const uint32_t kLogSegmentMaxHeaderOrFooterSize = 8 * 1024 * 1024; |
| |
| LogOptions::LogOptions() |
| : segment_size_mb(FLAGS_log_segment_size_mb), |
| force_fsync_all(FLAGS_log_force_fsync_all), |
| preallocate_segments(FLAGS_log_preallocate_segments), |
| async_preallocate_segments(FLAGS_log_async_preallocate_segments) { |
| } |
| |
| Status ReadableLogSegment::Open(Env* env, |
| const string& path, |
| scoped_refptr<ReadableLogSegment>* segment) { |
| VLOG(1) << "Parsing wal segment: " << path; |
| shared_ptr<RandomAccessFile> readable_file; |
| RETURN_NOT_OK_PREPEND(env_util::OpenFileForRandom(env, path, &readable_file), |
| "Unable to open file for reading"); |
| |
| segment->reset(new ReadableLogSegment(path, readable_file)); |
| RETURN_NOT_OK_PREPEND((*segment)->Init(), "Unable to initialize segment"); |
| return Status::OK(); |
| } |
| |
| ReadableLogSegment::ReadableLogSegment( |
| std::string path, shared_ptr<RandomAccessFile> readable_file) |
| : path_(std::move(path)), |
| file_size_(0), |
| readable_to_offset_(0), |
| readable_file_(std::move(readable_file)), |
| is_initialized_(false), |
| footer_was_rebuilt_(false) {} |
| |
| Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header, |
| const LogSegmentFooterPB& footer, |
| int64_t first_entry_offset) { |
| DCHECK(!IsInitialized()) << "Can only call Init() once"; |
| DCHECK(header.IsInitialized()) << "Log segment header must be initialized"; |
| DCHECK(footer.IsInitialized()) << "Log segment footer must be initialized"; |
| |
| RETURN_NOT_OK(ReadFileSize()); |
| |
| header_.CopyFrom(header); |
| footer_.CopyFrom(footer); |
| first_entry_offset_ = first_entry_offset; |
| is_initialized_ = true; |
| readable_to_offset_.Store(file_size()); |
| |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header, |
| int64_t first_entry_offset) { |
| DCHECK(!IsInitialized()) << "Can only call Init() once"; |
| DCHECK(header.IsInitialized()) << "Log segment header must be initialized"; |
| |
| RETURN_NOT_OK(ReadFileSize()); |
| |
| header_.CopyFrom(header); |
| first_entry_offset_ = first_entry_offset; |
| is_initialized_ = true; |
| |
| // On a new segment, we don't expect any readable entries yet. |
| readable_to_offset_.Store(first_entry_offset); |
| |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::Init() { |
| DCHECK(!IsInitialized()) << "Can only call Init() once"; |
| |
| RETURN_NOT_OK(ReadFileSize()); |
| |
| RETURN_NOT_OK(ReadHeader()); |
| |
| Status s = ReadFooter(); |
| if (!s.ok()) { |
| LOG(WARNING) << "Could not read footer for segment: " << path_ |
| << ": " << s.ToString(); |
| } |
| |
| is_initialized_ = true; |
| |
| readable_to_offset_.Store(file_size()); |
| |
| return Status::OK(); |
| } |
| |
| const int64_t ReadableLogSegment::readable_up_to() const { |
| return readable_to_offset_.Load(); |
| } |
| |
| void ReadableLogSegment::UpdateReadableToOffset(int64_t readable_to_offset) { |
| readable_to_offset_.Store(readable_to_offset); |
| file_size_.StoreMax(readable_to_offset); |
| } |
| |
| Status ReadableLogSegment::RebuildFooterByScanning() { |
| TRACE_EVENT1("log", "ReadableLogSegment::RebuildFooterByScanning", |
| "path", path_); |
| |
| DCHECK(!footer_.IsInitialized()); |
| vector<LogEntryPB*> entries; |
| ElementDeleter deleter(&entries); |
| int64_t end_offset = 0; |
| RETURN_NOT_OK(ReadEntries(&entries, &end_offset)); |
| |
| footer_.set_num_entries(entries.size()); |
| |
| // Rebuild the min/max replicate index (by scanning) |
| for (const LogEntryPB* entry : entries) { |
| if (entry->has_replicate()) { |
| int64_t index = entry->replicate().id().index(); |
| // TODO: common code with Log::UpdateFooterForBatch |
| if (!footer_.has_min_replicate_index() || |
| index < footer_.min_replicate_index()) { |
| footer_.set_min_replicate_index(index); |
| } |
| if (!footer_.has_max_replicate_index() || |
| index > footer_.max_replicate_index()) { |
| footer_.set_max_replicate_index(index); |
| } |
| } |
| } |
| |
| DCHECK(footer_.IsInitialized()); |
| DCHECK_EQ(entries.size(), footer_.num_entries()); |
| footer_was_rebuilt_ = true; |
| |
| readable_to_offset_.Store(end_offset); |
| |
| LOG(INFO) << "Successfully rebuilt footer for segment: " << path_ |
| << " (valid entries through byte offset " << end_offset << ")"; |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::ReadFileSize() { |
| // Check the size of the file. |
| // Env uses uint here, even though we generally prefer signed ints to avoid |
| // underflow bugs. Use a local to convert. |
| uint64_t size; |
| RETURN_NOT_OK_PREPEND(readable_file_->Size(&size), "Unable to read file size"); |
| file_size_.Store(size); |
| if (size == 0) { |
| VLOG(1) << "Log segment file $0 is zero-length: " << path(); |
| return Status::OK(); |
| } |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::ReadHeader() { |
| uint32_t header_size; |
| RETURN_NOT_OK(ReadHeaderMagicAndHeaderLength(&header_size)); |
| if (header_size == 0) { |
| // If a log file has been pre-allocated but not initialized, then |
| // 'header_size' will be 0 even the file size is > 0; in this |
| // case, 'is_initialized_' remains set to false and return |
| // Status::OK() early. LogReader ignores segments where |
| // IsInitialized() returns false. |
| return Status::OK(); |
| } |
| |
| if (header_size > kLogSegmentMaxHeaderOrFooterSize) { |
| return Status::Corruption( |
| Substitute("File is corrupted. " |
| "Parsed header size: $0 is zero or bigger than max header size: $1", |
| header_size, kLogSegmentMaxHeaderOrFooterSize)); |
| } |
| |
| uint8_t header_space[header_size]; |
| Slice header_slice; |
| LogSegmentHeaderPB header; |
| |
| // Read and parse the log segment header. |
| RETURN_NOT_OK_PREPEND(ReadFully(readable_file_.get(), kLogSegmentHeaderMagicAndHeaderLength, |
| header_size, &header_slice, header_space), |
| "Unable to read fully"); |
| |
| RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&header, |
| header_slice.data(), |
| header_size), |
| "Unable to parse protobuf"); |
| |
| header_.CopyFrom(header); |
| first_entry_offset_ = header_size + kLogSegmentHeaderMagicAndHeaderLength; |
| |
| return Status::OK(); |
| } |
| |
| |
| Status ReadableLogSegment::ReadHeaderMagicAndHeaderLength(uint32_t *len) { |
| uint8_t scratch[kLogSegmentHeaderMagicAndHeaderLength]; |
| Slice slice; |
| RETURN_NOT_OK(ReadFully(readable_file_.get(), 0, kLogSegmentHeaderMagicAndHeaderLength, |
| &slice, scratch)); |
| RETURN_NOT_OK(ParseHeaderMagicAndHeaderLength(slice, len)); |
| return Status::OK(); |
| } |
| |
| namespace { |
| |
| // We don't run TSAN on this function because it makes it really slow and causes some |
| // test timeouts. This is only used on local buffers anyway, so we don't lose much |
| // by not checking it. |
| ATTRIBUTE_NO_SANITIZE_THREAD |
| bool IsAllZeros(const Slice& s) { |
| // Walk a pointer through the slice instead of using s[i] |
| // since this is way faster in debug mode builds. We also do some |
| // manual unrolling for the same purpose. |
| const uint8_t* p = &s[0]; |
| int rem = s.size(); |
| |
| while (rem >= 8) { |
| if (UNALIGNED_LOAD64(p) != 0) return false; |
| rem -= 8; |
| p += 8; |
| } |
| |
| while (rem > 0) { |
| if (*p++ != '\0') return false; |
| rem--; |
| } |
| return true; |
| } |
| } // anonymous namespace |
| |
| Status ReadableLogSegment::ParseHeaderMagicAndHeaderLength(const Slice &data, |
| uint32_t *parsed_len) { |
| RETURN_NOT_OK_PREPEND(data.check_size(kLogSegmentHeaderMagicAndHeaderLength), |
| "Log segment file is too small to contain initial magic number"); |
| |
| if (memcmp(kLogSegmentHeaderMagicString, data.data(), |
| strlen(kLogSegmentHeaderMagicString)) != 0) { |
| // As a special case, we check whether the file was allocated but no header |
| // was written. We treat that case as an uninitialized file, much in the |
| // same way we treat zero-length files. |
| // Note: While the above comparison checks 8 bytes, this one checks the full 12 |
| // to ensure we have a full 12 bytes of NULL data. |
| if (IsAllZeros(data)) { |
| // 12 bytes of NULLs, good enough for us to consider this a file that |
| // was never written to (but apparently preallocated). |
| LOG(WARNING) << "Log segment file " << path() << " has 12 initial NULL bytes instead of " |
| << "magic and header length: " << data.ToDebugString() |
| << " and will be treated as a blank segment."; |
| *parsed_len = 0; |
| return Status::OK(); |
| } |
| // If no magic and not uninitialized, the file is considered corrupt. |
| return Status::Corruption(Substitute("Invalid log segment file $0: Bad magic. $1", |
| path(), data.ToDebugString())); |
| } |
| |
| *parsed_len = DecodeFixed32(data.data() + strlen(kLogSegmentHeaderMagicString)); |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::ReadFooter() { |
| uint32_t footer_size; |
| RETURN_NOT_OK(ReadFooterMagicAndFooterLength(&footer_size)); |
| |
| if (footer_size == 0 || footer_size > kLogSegmentMaxHeaderOrFooterSize) { |
| return Status::NotFound( |
| Substitute("File is corrupted. " |
| "Parsed header size: $0 is zero or bigger than max header size: $1", |
| footer_size, kLogSegmentMaxHeaderOrFooterSize)); |
| } |
| |
| if (footer_size > (file_size() - first_entry_offset_)) { |
| return Status::NotFound("Footer not found. File corrupted. " |
| "Decoded footer length pointed at a footer before the first entry."); |
| } |
| |
| uint8_t footer_space[footer_size]; |
| Slice footer_slice; |
| |
| int64_t footer_offset = file_size() - kLogSegmentFooterMagicAndFooterLength - footer_size; |
| |
| LogSegmentFooterPB footer; |
| |
| // Read and parse the log segment footer. |
| RETURN_NOT_OK_PREPEND(ReadFully(readable_file_.get(), footer_offset, |
| footer_size, &footer_slice, footer_space), |
| "Footer not found. Could not read fully."); |
| |
| RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&footer, |
| footer_slice.data(), |
| footer_size), |
| "Unable to parse protobuf"); |
| |
| footer_.Swap(&footer); |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::ReadFooterMagicAndFooterLength(uint32_t *len) { |
| uint8_t scratch[kLogSegmentFooterMagicAndFooterLength]; |
| Slice slice; |
| |
| CHECK_GT(file_size(), kLogSegmentFooterMagicAndFooterLength); |
| RETURN_NOT_OK(ReadFully(readable_file_.get(), |
| file_size() - kLogSegmentFooterMagicAndFooterLength, |
| kLogSegmentFooterMagicAndFooterLength, |
| &slice, |
| scratch)); |
| |
| RETURN_NOT_OK(ParseFooterMagicAndFooterLength(slice, len)); |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::ParseFooterMagicAndFooterLength(const Slice &data, |
| uint32_t *parsed_len) { |
| RETURN_NOT_OK_PREPEND(data.check_size(kLogSegmentFooterMagicAndFooterLength), |
| "Slice is too small to contain final magic number"); |
| |
| if (memcmp(kLogSegmentFooterMagicString, data.data(), |
| strlen(kLogSegmentFooterMagicString)) != 0) { |
| return Status::NotFound("Footer not found. Footer magic doesn't match"); |
| } |
| |
| *parsed_len = DecodeFixed32(data.data() + strlen(kLogSegmentFooterMagicString)); |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::ReadEntries(vector<LogEntryPB*>* entries, |
| int64_t* end_offset) { |
| TRACE_EVENT1("log", "ReadableLogSegment::ReadEntries", |
| "path", path_); |
| |
| vector<int64_t> recent_offsets(4, -1); |
| int batches_read = 0; |
| |
| int64_t offset = first_entry_offset(); |
| int64_t readable_to_offset = readable_to_offset_.Load(); |
| VLOG(1) << "Reading segment entries from " |
| << path_ << ": offset=" << offset << " file_size=" |
| << file_size() << " readable_to_offset=" << readable_to_offset; |
| faststring tmp_buf; |
| |
| // If we have a footer we only read up to it. If we don't we likely crashed |
| // and always read to the end. |
| int64_t read_up_to = (footer_.IsInitialized() && !footer_was_rebuilt_) ? |
| file_size() - footer_.ByteSize() - kLogSegmentFooterMagicAndFooterLength : |
| readable_to_offset; |
| |
| if (end_offset != nullptr) { |
| *end_offset = offset; |
| } |
| |
| int num_entries_read = 0; |
| while (offset < read_up_to) { |
| const int64_t this_batch_offset = offset; |
| recent_offsets[batches_read++ % recent_offsets.size()] = offset; |
| |
| gscoped_ptr<LogEntryBatchPB> current_batch; |
| |
| // Read and validate the entry header first. |
| Status s; |
| if (offset + kEntryHeaderSize < read_up_to) { |
| s = ReadEntryHeaderAndBatch(&offset, &tmp_buf, ¤t_batch); |
| } else { |
| s = Status::Corruption(Substitute("Truncated log entry at offset $0", offset)); |
| } |
| |
| if (PREDICT_FALSE(!s.ok())) { |
| if (!s.IsCorruption()) { |
| // IO errors should always propagate back |
| return s.CloneAndPrepend(Substitute("Error reading from log $0", path_)); |
| } |
| |
| Status corruption_status = MakeCorruptionStatus( |
| batches_read, this_batch_offset, &recent_offsets, |
| *entries, s); |
| |
| // If we have a valid footer in the segment, then the segment was correctly |
| // closed, and we shouldn't see any corruption anywhere (including the last |
| // batch). |
| if (HasFooter() && !footer_was_rebuilt_) { |
| LOG(WARNING) << "Found a corruption in a closed log segment: " |
| << corruption_status.ToString(); |
| return corruption_status; |
| } |
| |
| // If we read a corrupt entry, but we don't have a footer, then it's |
| // possible that we crashed in the middle of writing an entry. |
| // In this case, we scan forward to see if there are any more valid looking |
| // entries after this one in the file. If there are, it's really a corruption. |
| // if not, we just WARN it, since it's OK for the last entry to be partially |
| // written. |
| bool has_valid_entries; |
| RETURN_NOT_OK_PREPEND(ScanForValidEntryHeaders(offset, &has_valid_entries), |
| "Scanning forward for valid entries"); |
| if (has_valid_entries) { |
| return corruption_status; |
| } |
| |
| LOG(INFO) << "Ignoring log segment corruption in " << path_ << " because " |
| << "there are no log entries following the corrupted one. " |
| << "The server probably crashed in the middle of writing an entry " |
| << "to the write-ahead log or downloaded an active log via remote bootstrap. " |
| << "Error detail: " << corruption_status.ToString(); |
| break; |
| } |
| |
| if (VLOG_IS_ON(3)) { |
| VLOG(3) << "Read Log entry batch: " << current_batch->DebugString(); |
| } |
| for (size_t i = 0; i < current_batch->entry_size(); ++i) { |
| entries->push_back(current_batch->mutable_entry(i)); |
| num_entries_read++; |
| } |
| current_batch->mutable_entry()->ExtractSubrange(0, |
| current_batch->entry_size(), |
| nullptr); |
| if (end_offset != nullptr) { |
| *end_offset = offset; |
| } |
| } |
| |
| if (footer_.IsInitialized() && footer_.num_entries() != num_entries_read) { |
| return Status::Corruption( |
| Substitute("Read $0 log entries from $1, but expected $2 based on the footer", |
| num_entries_read, path_, footer_.num_entries())); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries) { |
| TRACE_EVENT1("log", "ReadableLogSegment::ScanForValidEntryHeaders", |
| "path", path_); |
| LOG(INFO) << "Scanning " << path_ << " for valid entry headers " |
| << "following offset " << offset << "..."; |
| *has_valid_entries = false; |
| |
| const int kChunkSize = 1024 * 1024; |
| gscoped_ptr<uint8_t[]> buf(new uint8_t[kChunkSize]); |
| |
| // We overlap the reads by the size of the header, so that if a header |
| // spans chunks, we don't miss it. |
| for (; |
| offset < file_size() - kEntryHeaderSize; |
| offset += kChunkSize - kEntryHeaderSize) { |
| int rem = std::min<int64_t>(file_size() - offset, kChunkSize); |
| Slice chunk; |
| RETURN_NOT_OK(ReadFully(readable_file().get(), offset, rem, &chunk, &buf[0])); |
| |
| // Optimization for the case where a chunk is all zeros -- this is common in the |
| // case of pre-allocated files. This avoids a lot of redundant CRC calculation. |
| if (IsAllZeros(chunk)) { |
| continue; |
| } |
| |
| // Check if this chunk has a valid entry header. |
| for (int off_in_chunk = 0; |
| off_in_chunk < chunk.size() - kEntryHeaderSize; |
| off_in_chunk++) { |
| Slice potential_header = Slice(&chunk[off_in_chunk], kEntryHeaderSize); |
| |
| EntryHeader header; |
| if (DecodeEntryHeader(potential_header, &header)) { |
| LOG(INFO) << "Found a valid entry header at offset " << (offset + off_in_chunk); |
| *has_valid_entries = true; |
| return Status::OK(); |
| } |
| } |
| } |
| |
| LOG(INFO) << "Found no log entry headers"; |
| return Status::OK(); |
| } |
| |
| Status ReadableLogSegment::MakeCorruptionStatus(int batch_number, int64_t batch_offset, |
| vector<int64_t>* recent_offsets, |
| const std::vector<LogEntryPB*>& entries, |
| const Status& status) const { |
| |
| string err = "Log file corruption detected. "; |
| SubstituteAndAppend(&err, "Failed trying to read batch #$0 at offset $1 for log segment $2: ", |
| batch_number, batch_offset, path_); |
| err.append("Prior batch offsets:"); |
| std::sort(recent_offsets->begin(), recent_offsets->end()); |
| for (int64_t offset : *recent_offsets) { |
| if (offset >= 0) { |
| SubstituteAndAppend(&err, " $0", offset); |
| } |
| } |
| if (!entries.empty()) { |
| err.append("; Last log entries read:"); |
| const int kNumEntries = 4; // Include up to the last 4 entries in the segment. |
| for (int i = std::max(0, static_cast<int>(entries.size()) - kNumEntries); |
| i < entries.size(); i++) { |
| LogEntryPB* entry = entries[i]; |
| LogEntryTypePB type = entry->type(); |
| string opid_str; |
| if (type == log::REPLICATE && entry->has_replicate()) { |
| opid_str = OpIdToString(entry->replicate().id()); |
| } else if (entry->has_commit() && entry->commit().has_commited_op_id()) { |
| opid_str = OpIdToString(entry->commit().commited_op_id()); |
| } else { |
| opid_str = "<unknown>"; |
| } |
| SubstituteAndAppend(&err, " [$0 ($1)]", LogEntryTypePB_Name(type), opid_str); |
| } |
| } |
| |
| return status.CloneAndAppend(err); |
| } |
| |
| Status ReadableLogSegment::ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf, |
| gscoped_ptr<LogEntryBatchPB>* batch) { |
| EntryHeader header; |
| RETURN_NOT_OK(ReadEntryHeader(offset, &header)); |
| RETURN_NOT_OK(ReadEntryBatch(offset, header, tmp_buf, batch)); |
| return Status::OK(); |
| } |
| |
| |
| Status ReadableLogSegment::ReadEntryHeader(int64_t *offset, EntryHeader* header) { |
| uint8_t scratch[kEntryHeaderSize]; |
| Slice slice; |
| RETURN_NOT_OK_PREPEND(ReadFully(readable_file().get(), *offset, kEntryHeaderSize, |
| &slice, scratch), |
| "Could not read log entry header"); |
| |
| if (PREDICT_FALSE(!DecodeEntryHeader(slice, header))) { |
| return Status::Corruption("CRC mismatch in log entry header"); |
| } |
| *offset += slice.size(); |
| return Status::OK(); |
| } |
| |
| bool ReadableLogSegment::DecodeEntryHeader(const Slice& data, EntryHeader* header) { |
| DCHECK_EQ(kEntryHeaderSize, data.size()); |
| header->msg_length = DecodeFixed32(&data[0]); |
| header->msg_crc = DecodeFixed32(&data[4]); |
| header->header_crc = DecodeFixed32(&data[8]); |
| |
| // Verify the header. |
| uint32_t computed_crc = crc::Crc32c(&data[0], 8); |
| return computed_crc == header->header_crc; |
| } |
| |
| |
| Status ReadableLogSegment::ReadEntryBatch(int64_t *offset, |
| const EntryHeader& header, |
| faststring *tmp_buf, |
| gscoped_ptr<LogEntryBatchPB> *entry_batch) { |
| TRACE_EVENT2("log", "ReadableLogSegment::ReadEntryBatch", |
| "path", path_, |
| "range", Substitute("offset=$0 entry_len=$1", |
| *offset, header.msg_length)); |
| |
| if (header.msg_length == 0) { |
| return Status::Corruption("Invalid 0 entry length"); |
| } |
| int64_t limit = readable_up_to(); |
| if (PREDICT_FALSE(header.msg_length + *offset > limit)) { |
| // The log was likely truncated during writing. |
| return Status::Corruption( |
| Substitute("Could not read $0-byte log entry from offset $1 in $2: " |
| "log only readable up to offset $3", |
| header.msg_length, *offset, path_, limit)); |
| } |
| |
| tmp_buf->clear(); |
| tmp_buf->resize(header.msg_length); |
| Slice entry_batch_slice; |
| |
| Status s = readable_file()->Read(*offset, |
| header.msg_length, |
| &entry_batch_slice, |
| tmp_buf->data()); |
| |
| if (!s.ok()) return Status::IOError(Substitute("Could not read entry. Cause: $0", |
| s.ToString())); |
| |
| // Verify the CRC. |
| uint32_t read_crc = crc::Crc32c(entry_batch_slice.data(), entry_batch_slice.size()); |
| if (PREDICT_FALSE(read_crc != header.msg_crc)) { |
| return Status::Corruption(Substitute("Entry CRC mismatch in byte range $0-$1: " |
| "expected CRC=$2, computed=$3", |
| *offset, *offset + header.msg_length, |
| header.msg_crc, read_crc)); |
| } |
| |
| |
| gscoped_ptr<LogEntryBatchPB> read_entry_batch(new LogEntryBatchPB()); |
| s = pb_util::ParseFromArray(read_entry_batch.get(), |
| entry_batch_slice.data(), |
| header.msg_length); |
| |
| if (!s.ok()) return Status::Corruption(Substitute("Could parse PB. Cause: $0", |
| s.ToString())); |
| |
| *offset += entry_batch_slice.size(); |
| entry_batch->reset(read_entry_batch.release()); |
| return Status::OK(); |
| } |
| |
| WritableLogSegment::WritableLogSegment(string path, |
| shared_ptr<WritableFile> writable_file) |
| : path_(std::move(path)), |
| writable_file_(std::move(writable_file)), |
| is_header_written_(false), |
| is_footer_written_(false), |
| written_offset_(0) {} |
| |
| Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB& new_header) { |
| DCHECK(!IsHeaderWritten()) << "Can only call WriteHeader() once"; |
| DCHECK(new_header.IsInitialized()) |
| << "Log segment header must be initialized" << new_header.InitializationErrorString(); |
| faststring buf; |
| |
| // First the magic. |
| buf.append(kLogSegmentHeaderMagicString); |
| // Then Length-prefixed header. |
| PutFixed32(&buf, new_header.ByteSize()); |
| // Then Serialize the PB. |
| if (!pb_util::AppendToString(new_header, &buf)) { |
| return Status::Corruption("unable to encode header"); |
| } |
| RETURN_NOT_OK(writable_file()->Append(Slice(buf))); |
| |
| header_.CopyFrom(new_header); |
| first_entry_offset_ = buf.size(); |
| written_offset_ = first_entry_offset_; |
| is_header_written_ = true; |
| |
| return Status::OK(); |
| } |
| |
| Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer) { |
| TRACE_EVENT1("log", "WritableLogSegment::WriteFooterAndClose", |
| "path", path_); |
| DCHECK(IsHeaderWritten()); |
| DCHECK(!IsFooterWritten()); |
| DCHECK(footer.IsInitialized()) << footer.InitializationErrorString(); |
| |
| faststring buf; |
| |
| if (!pb_util::AppendToString(footer, &buf)) { |
| return Status::Corruption("unable to encode header"); |
| } |
| |
| buf.append(kLogSegmentFooterMagicString); |
| PutFixed32(&buf, footer.ByteSize()); |
| |
| RETURN_NOT_OK_PREPEND(writable_file()->Append(Slice(buf)), "Could not write the footer"); |
| |
| footer_.CopyFrom(footer); |
| is_footer_written_ = true; |
| |
| RETURN_NOT_OK(writable_file_->Close()); |
| |
| written_offset_ += buf.size(); |
| |
| return Status::OK(); |
| } |
| |
| |
| Status WritableLogSegment::WriteEntryBatch(const Slice& data) { |
| DCHECK(is_header_written_); |
| DCHECK(!is_footer_written_); |
| uint8_t header_buf[kEntryHeaderSize]; |
| |
| // First encode the length of the message. |
| uint32_t len = data.size(); |
| InlineEncodeFixed32(&header_buf[0], len); |
| |
| // Then the CRC of the message. |
| uint32_t msg_crc = crc::Crc32c(&data[0], data.size()); |
| InlineEncodeFixed32(&header_buf[4], msg_crc); |
| |
| // Then the CRC of the header |
| uint32_t header_crc = crc::Crc32c(&header_buf, 8); |
| InlineEncodeFixed32(&header_buf[8], header_crc); |
| |
| // Write the header to the file, followed by the batch data itself. |
| RETURN_NOT_OK(writable_file_->Append(Slice(header_buf, sizeof(header_buf)))); |
| written_offset_ += sizeof(header_buf); |
| |
| RETURN_NOT_OK(writable_file_->Append(data)); |
| written_offset_ += data.size(); |
| |
| return Status::OK(); |
| } |
| |
| |
| void CreateBatchFromAllocatedOperations(const vector<consensus::ReplicateRefPtr>& msgs, |
| gscoped_ptr<LogEntryBatchPB>* batch) { |
| gscoped_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB); |
| entry_batch->mutable_entry()->Reserve(msgs.size()); |
| for (const auto& msg : msgs) { |
| LogEntryPB* entry_pb = entry_batch->add_entry(); |
| entry_pb->set_type(log::REPLICATE); |
| entry_pb->set_allocated_replicate(msg->get()); |
| } |
| batch->reset(entry_batch.release()); |
| } |
| |
| bool IsLogFileName(const string& fname) { |
| if (HasPrefixString(fname, ".")) { |
| // Hidden file or ./.. |
| VLOG(1) << "Ignoring hidden file: " << fname; |
| return false; |
| } |
| |
| if (HasSuffixString(fname, kTmpSuffix)) { |
| LOG(WARNING) << "Ignoring tmp file: " << fname; |
| return false; |
| } |
| |
| vector<string> v = strings::Split(fname, "-"); |
| if (v.size() != 2 || v[0] != FsManager::kWalFileNamePrefix) { |
| VLOG(1) << "Not a log file: " << fname; |
| return false; |
| } |
| |
| return true; |
| } |
| |
| } // namespace log |
| } // namespace kudu |