blob: 40743e0bc24aa39c1b6a34cad43d9ed2e196bcf3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/log_reader.h"
#include <algorithm>
#include <cstddef>
#include <memory>
#include <mutex>
#include <ostream>
#include <type_traits>
#include <glog/logging.h>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
#include "kudu/util/metrics.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
METRIC_DEFINE_counter(tablet, log_reader_bytes_read, "Bytes Read From Log",
kudu::MetricUnit::kBytes,
"Data read from the WAL since tablet start",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_counter(tablet, log_reader_entries_read, "Entries Read From Log",
kudu::MetricUnit::kEntries,
"Number of entries read from the WAL since tablet start",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_histogram(tablet, log_reader_read_batch_latency, "Log Read Latency",
kudu::MetricUnit::kMicroseconds,
"Microseconds spent reading log entry batches",
kudu::MetricLevel::kInfo,
60000000LU, 2);
using kudu::consensus::OpId;
using kudu::consensus::ReplicateMsg;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace log {
namespace {
struct LogSegmentSeqnoComparator {
bool operator() (const scoped_refptr<ReadableLogSegment>& a,
const scoped_refptr<ReadableLogSegment>& b) {
return a->header().sequence_number() < b->header().sequence_number();
}
};
}
const int64_t LogReader::kNoSizeLimit = -1;
Status LogReader::Open(Env* env,
const string& tablet_wal_dir,
const scoped_refptr<LogIndex>& index,
const string& tablet_id,
const scoped_refptr<MetricEntity>& metric_entity,
FileCache* file_cache,
shared_ptr<LogReader>* reader) {
auto log_reader = LogReader::make_shared(env, index, tablet_id,
metric_entity, file_cache);
RETURN_NOT_OK_PREPEND(log_reader->Init(tablet_wal_dir),
"Unable to initialize log reader");
*reader = log_reader;
return Status::OK();
}
Status LogReader::Open(FsManager* fs_manager,
const scoped_refptr<LogIndex>& index,
const std::string& tablet_id,
const scoped_refptr<MetricEntity>& metric_entity,
FileCache* file_cache,
std::shared_ptr<LogReader>* reader) {
return LogReader::Open(fs_manager->env(), fs_manager->GetTabletWalDir(tablet_id),
index, tablet_id, metric_entity, file_cache, reader);
}
LogReader::LogReader(Env* env,
scoped_refptr<LogIndex> index,
string tablet_id,
const scoped_refptr<MetricEntity>& metric_entity,
FileCache* file_cache)
: env_(env),
file_cache_(file_cache),
log_index_(std::move(index)),
tablet_id_(std::move(tablet_id)),
state_(kLogReaderInitialized) {
if (metric_entity) {
bytes_read_ = METRIC_log_reader_bytes_read.Instantiate(metric_entity);
entries_read_ = METRIC_log_reader_entries_read.Instantiate(metric_entity);
read_batch_latency_ = METRIC_log_reader_read_batch_latency.Instantiate(metric_entity);
}
}
LogReader::~LogReader() {
}
Status LogReader::Init(const string& tablet_wal_path) {
{
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(state_, kLogReaderInitialized) << "bad state for Init(): " << state_;
}
VLOG(1) << "Reading wal from path:" << tablet_wal_path;
if (!env_->FileExists(tablet_wal_path)) {
return Status::IllegalState("Cannot find wal location at", tablet_wal_path);
}
VLOG(1) << "Parsing segments from path: " << tablet_wal_path;
// list existing segment files
vector<string> log_files;
RETURN_NOT_OK_PREPEND(env_->GetChildren(tablet_wal_path, &log_files),
"Unable to read children from path");
SegmentSequence read_segments;
read_segments.reserve(log_files.size()); // Overestimate; will shrink_to_fit later.
// build a log segment from each file
for (const string &log_file : log_files) {
if (HasPrefixString(log_file, FsManager::kWalFileNamePrefix)) {
string fqp = JoinPathSegments(tablet_wal_path, log_file);
scoped_refptr<ReadableLogSegment> segment;
Status s = ReadableLogSegment::Open(env_, file_cache_, fqp, &segment);
if (s.IsUninitialized()) {
// This indicates that the segment was created but the writer
// crashed before the header was successfully written. In this
// case, we should skip it.
LOG(WARNING) << "Ignoring log segment " << log_file << " since it was uninitialized "
<< "(probably left after a prior tablet server crash)";
continue;
}
RETURN_NOT_OK_PREPEND(s, "Unable to open readable log segment");
DCHECK(segment);
CHECK(segment->IsInitialized()) << "Uninitialized segment at: " << segment->path();
if (!segment->HasFooter()) {
VLOG(1) << "Log segment " << fqp << " was likely left in-progress "
<< "after a previous crash. Will try to rebuild footer by scanning data.";
RETURN_NOT_OK(segment->RebuildFooterByScanning());
}
read_segments.emplace_back(std::move(segment));
}
}
read_segments.shrink_to_fit();
// Sort the segments by sequence number.
std::sort(read_segments.begin(), read_segments.end(), LogSegmentSeqnoComparator());
{
std::lock_guard<simple_spinlock> lock(lock_);
string previous_seg_path;
int64_t previous_seg_seqno = -1;
for (auto& entry : read_segments) {
VLOG(1) << " Log Reader Indexed: " << SecureShortDebugString(entry->footer());
// Check that the log segments are in sequence.
if (previous_seg_seqno != -1 && entry->header().sequence_number() != previous_seg_seqno + 1) {
return Status::Corruption(Substitute("Segment sequence numbers are not consecutive. "
"Previous segment: seqno $0, path $1; Current segment: seqno $2, path $3",
previous_seg_seqno, previous_seg_path,
entry->header().sequence_number(), entry->path()));
}
previous_seg_seqno = entry->header().sequence_number();
previous_seg_path = entry->path();
DCHECK(entry->HasFooter());
AppendSegmentUnlocked(std::move(entry));
}
state_ = kLogReaderReading;
}
return Status::OK();
}
void LogReader::InitEmptyReaderForTests() {
std::lock_guard<simple_spinlock> lock(lock_);
state_ = kLogReaderReading;
}
int64_t LogReader::GetMinReplicateIndex() const {
std::lock_guard<simple_spinlock> lock(lock_);
int64_t min_remaining_op_idx = -1;
for (const scoped_refptr<ReadableLogSegment>& segment : segments_) {
if (!segment->HasFooter()) continue;
if (!segment->footer().has_min_replicate_index()) continue;
if (min_remaining_op_idx == -1 ||
segment->footer().min_replicate_index() < min_remaining_op_idx) {
min_remaining_op_idx = segment->footer().min_replicate_index();
}
}
return min_remaining_op_idx;
}
scoped_refptr<ReadableLogSegment> LogReader::GetSegmentBySequenceNumber(int64_t seq) const {
std::lock_guard<simple_spinlock> lock(lock_);
if (segments_.empty()) {
return nullptr;
}
// We always have a contiguous set of log segments, so we can find the requested
// segment in our vector by calculating its offset vs the first element.
int64_t first_seqno = segments_[0]->header().sequence_number();
int64_t relative = seq - first_seqno;
if (relative < 0 || relative >= segments_.size()) {
return nullptr;
}
DCHECK_EQ(segments_[relative]->header().sequence_number(), seq);
return segments_[relative];
}
Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
faststring* tmp_buf,
LogEntryBatchPB* batch) const {
const int64_t index = index_entry.op_id.index();
scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber(
index_entry.segment_sequence_number);
if (PREDICT_FALSE(!segment)) {
return Status::NotFound(Substitute("Segment $0 which contained index $1 has been GCed",
index_entry.segment_sequence_number,
index));
}
CHECK_GT(index_entry.offset_in_segment, 0);
int64_t offset = index_entry.offset_in_segment;
ScopedLatencyMetric scoped(read_batch_latency_.get());
EntryHeaderStatus unused_status_detail;
RETURN_NOT_OK_PREPEND(segment->ReadEntryHeaderAndBatch(&offset, tmp_buf, batch,
&unused_status_detail),
Substitute("Failed to read LogEntry for index $0 from log segment "
"$1 offset $2",
index,
index_entry.segment_sequence_number,
index_entry.offset_in_segment));
if (bytes_read_) {
bytes_read_->IncrementBy(segment->entry_header_size() + tmp_buf->length());
entries_read_->IncrementBy(batch->entry_size());
}
return Status::OK();
}
Status LogReader::ReadReplicatesInRange(int64_t starting_at,
int64_t up_to,
int64_t max_bytes_to_read,
vector<ReplicateMsg*>* replicates) const {
DCHECK_GT(starting_at, 0);
DCHECK_GE(up_to, starting_at);
DCHECK(log_index_) << "Require an index to random-read logs";
vector<ReplicateMsg*> replicates_tmp;
ElementDeleter d(&replicates_tmp);
LogIndexEntry prev_index_entry;
size_t total_size = 0;
bool limit_exceeded = false;
faststring tmp_buf;
LogEntryBatchPB batch;
for (int64_t index = starting_at; index <= up_to && !limit_exceeded; index++) {
LogIndexEntry index_entry;
RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry),
Substitute("Failed to read log index for op $0", index));
// Since a given LogEntryBatchPB may contain multiple REPLICATE messages,
// it's likely that this index entry points to the same batch as the previous
// one. If that's the case, we've already read this REPLICATE and we can
// skip reading the batch again.
if (index == starting_at ||
index_entry.segment_sequence_number != prev_index_entry.segment_sequence_number ||
index_entry.offset_in_segment != prev_index_entry.offset_in_segment) {
RETURN_NOT_OK(ReadBatchUsingIndexEntry(index_entry, &tmp_buf, &batch));
// Sanity-check the property that a batch should only have increasing indexes.
int64_t prev_index = 0;
for (int i = 0; i < batch.entry_size(); ++i) {
LogEntryPB* entry = batch.mutable_entry(i);
if (!entry->has_replicate()) continue;
int64_t this_index = entry->replicate().id().index();
CHECK_GT(this_index, prev_index)
<< "Expected that an entry batch should only include increasing log indexes: "
<< index_entry.ToString()
<< "\nBatch: " << SecureDebugString(batch);
prev_index = this_index;
}
}
bool found = false;
for (int i = 0; i < batch.entry_size(); ++i) {
LogEntryPB* entry = batch.mutable_entry(i);
if (!entry->has_replicate()) {
continue;
}
if (entry->replicate().id().index() != index) {
continue;
}
size_t space_required = entry->replicate().SpaceUsedLong();
if (replicates_tmp.empty() ||
max_bytes_to_read <= 0 ||
total_size + space_required < max_bytes_to_read) {
total_size += space_required;
replicates_tmp.push_back(entry->release_replicate());
} else {
limit_exceeded = true;
}
found = true;
break;
}
CHECK(found) << "Incorrect index entry didn't yield expected log entry: "
<< index_entry.ToString();
prev_index_entry = index_entry;
}
replicates->swap(replicates_tmp);
return Status::OK();
}
Status LogReader::LookupOpId(int64_t op_index, OpId* op_id) const {
LogIndexEntry index_entry;
RETURN_NOT_OK_PREPEND(log_index_->GetEntry(op_index, &index_entry),
strings::Substitute("Failed to read log index for op $0", op_index));
*op_id = index_entry.op_id;
return Status::OK();
}
void LogReader::GetSegmentsSnapshot(SegmentSequence* segments) const {
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(state_, kLogReaderReading);
segments->assign(segments_.begin(), segments_.end());
}
void LogReader::TrimSegmentsUpToAndIncluding(int64_t segment_sequence_number) {
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(state_, kLogReaderReading);
auto iter = segments_.begin();
int num_deleted_segments = 0;
while (iter != segments_.end()) {
if ((*iter)->header().sequence_number() <= segment_sequence_number) {
iter = segments_.erase(iter);
num_deleted_segments++;
continue;
}
break;
}
LOG(INFO) << "T " << tablet_id_ << ": removed " << num_deleted_segments
<< " log segments from log reader";
}
void LogReader::UpdateLastSegmentOffset(int64_t readable_to_offset) {
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(state_, kLogReaderReading);
DCHECK(!segments_.empty());
// Get the last segment
auto* segment = segments_.back().get();
DCHECK(!segment->HasFooter());
segment->UpdateReadableToOffset(readable_to_offset);
}
void LogReader::ReplaceLastSegment(scoped_refptr<ReadableLogSegment> segment) {
// This is used to replace the last segment once we close it properly so it must
// have a footer.
DCHECK(segment->HasFooter());
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(state_, kLogReaderReading);
// Make sure the segment we're replacing has the same sequence number
DCHECK(!segments_.empty());
CHECK_EQ(segment->header().sequence_number(), segments_.back()->header().sequence_number());
segments_[segments_.size() - 1] = std::move(segment);
}
Status LogReader::AppendSegment(scoped_refptr<ReadableLogSegment> segment) {
DCHECK(segment->IsInitialized());
if (PREDICT_FALSE(!segment->HasFooter())) {
RETURN_NOT_OK(segment->RebuildFooterByScanning());
}
std::lock_guard<simple_spinlock> lock(lock_);
AppendSegmentUnlocked(std::move(segment));
return Status::OK();
}
void LogReader::AppendSegmentUnlocked(scoped_refptr<ReadableLogSegment> segment) {
DCHECK(segment->IsInitialized());
DCHECK(lock_.is_locked());
#ifndef NDEBUG
if (!segments_.empty()) {
DCHECK_EQ(segments_.back()->header().sequence_number() + 1,
segment->header().sequence_number());
}
#endif
segments_.emplace_back(std::move(segment));
}
void LogReader::AppendEmptySegment(scoped_refptr<ReadableLogSegment> segment) {
DCHECK(segment->IsInitialized());
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(state_, kLogReaderReading);
AppendSegmentUnlocked(std::move(segment));
}
int LogReader::num_segments() const {
std::lock_guard<simple_spinlock> lock(lock_);
return segments_.size();
}
string LogReader::ToString() const {
std::lock_guard<simple_spinlock> lock(lock_);
string ret = "Reader's SegmentSequence: \n";
for (const SegmentSequence::value_type& entry : segments_) {
ret.append(Substitute("Segment: $0 Footer: $1\n",
entry->header().sequence_number(),
!entry->HasFooter() ? "NONE" : SecureShortDebugString(entry->footer())));
}
return ret;
}
} // namespace log
} // namespace kudu