blob: 429036f2018ba7ae27f55088b9f401d56693d56d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/log.h"
#include <cerrno>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
#include <utility>
#include <boost/range/adaptor/reversed.hpp>
#include <gflags/gflags.h>
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_metrics.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/async_util.h"
#include "kudu/util/compression/compression_codec.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
// Log retention configuration.
// -----------------------------
DEFINE_int32(log_min_segments_to_retain, 1,
"The minimum number of past log segments to keep at all times,"
" regardless of what is required for durability. "
"Must be at least 1.");
TAG_FLAG(log_min_segments_to_retain, runtime);
TAG_FLAG(log_min_segments_to_retain, advanced);
DEFINE_int32(log_max_segments_to_retain, 80,
"The maximum number of past log segments to keep at all times for "
"the purposes of catching up other peers.");
TAG_FLAG(log_max_segments_to_retain, runtime);
TAG_FLAG(log_max_segments_to_retain, advanced);
TAG_FLAG(log_max_segments_to_retain, experimental);
// Group commit configuration.
// -----------------------------
DEFINE_int32(group_commit_queue_size_bytes, 4 * 1024 * 1024,
"Maximum size of the group commit queue in bytes");
TAG_FLAG(group_commit_queue_size_bytes, advanced);
DEFINE_int32(log_thread_idle_threshold_ms, 1000,
"Number of milliseconds after which the log append thread decides that a "
"log is idle, and considers shutting down. Used by tests.");
TAG_FLAG(log_thread_idle_threshold_ms, experimental);
TAG_FLAG(log_thread_idle_threshold_ms, hidden);
// Compression configuration.
// -----------------------------
DEFINE_string(log_compression_codec, "LZ4",
"Codec to use for compressing WAL segments.");
TAG_FLAG(log_compression_codec, experimental);
// Fault/latency injection flags.
// -----------------------------
DEFINE_bool(log_inject_latency, false,
"If true, injects artificial latency in log sync operations. "
"Advanced option. Use at your own risk -- has a negative effect "
"on performance for obvious reasons!");
DEFINE_bool(skip_remove_old_recovery_dir, false,
"Skip removing WAL recovery dir after startup. (useful for debugging)");
TAG_FLAG(skip_remove_old_recovery_dir, hidden);
TAG_FLAG(log_inject_latency, unsafe);
TAG_FLAG(log_inject_latency, runtime);
DEFINE_int32(log_inject_latency_ms_mean, 100,
"The number of milliseconds of latency to inject, on average. "
"Only takes effect if --log_inject_latency is true");
TAG_FLAG(log_inject_latency_ms_mean, unsafe);
TAG_FLAG(log_inject_latency_ms_mean, runtime);
DEFINE_int32(log_inject_latency_ms_stddev, 100,
"The standard deviation of latency to inject in the log. "
"Only takes effect if --log_inject_latency is true");
TAG_FLAG(log_inject_latency_ms_stddev, unsafe);
TAG_FLAG(log_inject_latency_ms_stddev, runtime);
DEFINE_int32(log_inject_thread_lifecycle_latency_ms, 0,
"Injection point for random latency during key thread lifecycle transition "
TAG_FLAG(log_inject_thread_lifecycle_latency_ms, unsafe);
TAG_FLAG(log_inject_thread_lifecycle_latency_ms, runtime);
DEFINE_double(fault_crash_before_append_commit, 0.0,
"Fraction of the time when the server will crash just before appending a "
"COMMIT message to the log. (For testing only!)");
TAG_FLAG(fault_crash_before_append_commit, unsafe);
TAG_FLAG(fault_crash_before_append_commit, runtime);
DEFINE_double(log_inject_io_error_on_append_fraction, 0.0,
"Fraction of the time when the log will fail to append and return an IOError. "
"(For testing only!)");
TAG_FLAG(log_inject_io_error_on_append_fraction, unsafe);
TAG_FLAG(log_inject_io_error_on_append_fraction, runtime);
DEFINE_double(log_inject_io_error_on_preallocate_fraction, 0.0,
"Fraction of the time when the log will fail to preallocate and return an IOError. "
"(For testing only!)");
TAG_FLAG(log_inject_io_error_on_preallocate_fraction, unsafe);
TAG_FLAG(log_inject_io_error_on_preallocate_fraction, runtime);
DEFINE_int64(fs_wal_dir_reserved_bytes, -1,
"Number of bytes to reserve on the log directory filesystem for "
"non-Kudu usage. The default, which is represented by -1, is that "
"1% of the disk space on each disk will be reserved. Any other "
"value specified represents the number of bytes reserved and must "
"be greater than or equal to 0. Explicit percentages to reserve "
"are not currently supported");
DEFINE_validator(fs_wal_dir_reserved_bytes, [](const char* /*n*/, int64_t v) { return v >= -1; });
TAG_FLAG(fs_wal_dir_reserved_bytes, runtime);
TAG_FLAG(fs_wal_dir_reserved_bytes, evolving);
// Validate that log_min_segments_to_retain >= 1
static bool ValidateLogsToRetain(const char* flagname, int value) {
if (value >= 1) {
return true;
LOG(ERROR) << strings::Substitute("$0 must be at least 1, value $1 is invalid",
flagname, value);
return false;
DEFINE_validator(log_min_segments_to_retain, &ValidateLogsToRetain);
namespace kudu {
namespace log {
using consensus::CommitMsg;
using consensus::ReplicateRefPtr;
using env_util::OpenFileForRandom;
using std::shared_ptr;
using std::string;
using std::vector;
using std::unique_ptr;
using strings::Substitute;
string LogContext::LogPrefix() const {
return Substitute("T $0 P $1: ", tablet_id, fs_manager->uuid());
// Manages the thread which drains groups of batches from the log's queue and
// appends them to the underlying log instance.
// Rather than being a long-running thread, this instead uses a threadpool with
// size 1 to automatically start and stop a thread on demand. When the log
// is idle for some amount of time, no task will be on the thread pool, and thus
// the underlying thread may exit.
// The design of submitting tasks to the threadpool is slightly tricky in order
// to achieve group commit and not have to submit one task per appended batch.
// Instead, a generic 'ProcessQueue()' task is used which loops collecting
// batches to write until it finds that the queue has been empty for a while,
// at which point the task finishes.
// The trick, then, lies in two areas:
// 1) After adding a batch to the queue, we need to ensure that a task is
// already running, and if not, start one. This is done in Wake().
// 2) When the task finds no more batches to write and wants to go idle, it
// needs to ensure that it doesn't miss a concurrent additions to the queue.
// This is done in GoIdle().
// See the implementation comments in Wake() and GoIdle() for details.
class Log::AppendThread {
explicit AppendThread(Log* log);
// Initializes the objects and starts the thread pool.
Status Init();
// Waits until the last enqueued elements are processed, sets the
// Appender thread to closing state. If any entries are added to the
// queue during the process, invoke their callbacks' 'OnFailure()'
// method.
void Shutdown();
// Wake up the appender task, if it is not already running.
// This should be called after each time that a new entry is
// appended to the log's queue.
void Wake();
bool active() const {
return base::subtle::NoBarrier_Load(&thread_state_) == ACTIVE;
// The task submitted to the threadpool which collects batches from the queue
// and appends them, until it determines that the queue is idle.
void ProcessQueue();
// Tries to transition back to IDLE state. If successful, returns true.
// Otherwise, returns false to indicate that the task should keep running because
// a new task was enqueued just as we were trying to go idle.
bool GoIdle();
// Handle the actual appending of a group of entries. Responsible for deleting the
// LogEntryBatch* pointers.
void HandleBatches(vector<LogEntryBatch*> entry_batches);
string LogPrefix() const;
Log* const log_;
// Atomic state machine for whether there is any task currently queued or
// running on append_pool_. See Wake() and GoIdle() for more details.
enum ThreadState {
// No task is queued or running.
// A task is queued or running.
Atomic32 thread_state_ = IDLE;
// Pool with a single thread, which handles shutting down the thread
// when idle.
unique_ptr<ThreadPool> append_pool_;
Log::AppendThread::AppendThread(Log *log)
: log_(log) {
Status Log::AppendThread::Init() {
DCHECK(!append_pool_) << "Already initialized";
VLOG_WITH_PREFIX(1) << "Starting log append thread";
// Only need one thread since we'll only schedule one
// task at a time.
// No need for keeping idle threads, since the task itself
// handles waiting for work while idle.
return Status::OK();
void Log::AppendThread::Wake() {
auto old_status = base::subtle::NoBarrier_CompareAndSwap(
&thread_state_, IDLE, ACTIVE);
if (old_status == IDLE) {
CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::ProcessQueue, Unretained(this))));
void Log::SetActiveSegmentIdle() {
std::lock_guard<rw_spinlock> l(segment_idle_lock_);
bool Log::AppendThread::GoIdle() {
// Inject latency at key points in this function for the purposes of tests.
// Stopping is a bit tricky. We have to consider the following race:
// T1 AppendThread
// ------------ -------------
// - state is TRIGGERED
// - BlockingDrainTo returns TimedOut()
// - queue.Put()
// - Wake() no-op because
// it's already triggered
// So, we first transition back to STOPPED state, and then re-check to see
// if there has been something enqueued in the meantime.
auto old_state = base::subtle::NoBarrier_AtomicExchange(&thread_state_, IDLE);
DCHECK_EQ(old_state, ACTIVE);
if (log_->entry_queue()->empty()) {
// Nothing got enqueued, which means there must not have been any missed wakeup.
// We are now in IDLE state.
return true;
// Someone enqueued something. We don't know whether their wakeup was successful
// or not, but we can just try to transition back to ACTIVE mode here.
if (base::subtle::NoBarrier_CompareAndSwap(&thread_state_, IDLE, ACTIVE)
== IDLE) {
// Their wake-up was lost, but we've now marked ourselves as running.
return false;
// Their wake-up was successful, meaning that there is another task on the
// queue behind us now, so we can exit this one.
return true;
void Log::AppendThread::ProcessQueue() {
VLOG_WITH_PREFIX(2) << "WAL Appender going active";
while (true) {
MonoTime deadline = MonoTime::Now() +
vector<LogEntryBatch*> entry_batches;
Status s = log_->entry_queue()->BlockingDrainTo(&entry_batches, deadline);
if (PREDICT_FALSE(s.IsAborted())) {
} else if (PREDICT_FALSE(s.IsTimedOut())) {
if (GoIdle()) break;
VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
if (log_->ctx_.metrics) {
TRACE_EVENT1("log", "batch", "batch_size", entry_batches.size());
SCOPED_LATENCY_METRIC(log_->ctx_.metrics, group_commit_latency);
bool is_all_commits = true;
for (LogEntryBatch* entry_batch : entry_batches) {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
Status s = log_->WriteBatch(entry_batch);
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
// TODO(af): If a single transaction fails to append, should we
// abort all subsequent transactions in this batch or allow
// them to be appended? What about transactions in future
// batches?
if (!entry_batch->callback().is_null()) {
if (is_all_commits && entry_batch->type_ != COMMIT) {
is_all_commits = false;
Status s;
if (!is_all_commits) {
s = log_->Sync();
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
for (LogEntryBatch* entry_batch : entry_batches) {
if (!entry_batch->callback().is_null()) {
delete entry_batch;
} else {
TRACE_EVENT0("log", "Callbacks");
VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
for (LogEntryBatch* entry_batch : entry_batches) {
if (PREDICT_TRUE(!entry_batch->callback().is_null())) {
// It's important to delete each batch as we see it, because
// deleting it may free up memory from memory trackers, and the
// callback of a later batch may want to use that memory.
delete entry_batch;
void Log::AppendThread::Shutdown() {
if (append_pool_) {
string Log::AppendThread::LogPrefix() const {
return log_->LogPrefix();
// Return true if the append thread is currently active.
bool Log::append_thread_active_for_tests() const {
return append_thread_->active();
SegmentAllocator::SegmentAllocator(const LogOptions* opts,
const LogContext* ctx,
Schema schema,
uint32_t schema_version)
: opts_(opts),
max_segment_size_(opts_->segment_size_mb * 1024 * 1024),
sync_disabled_(false) {}
Status SegmentAllocator::Init(
uint64_t sequence_number,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
// Init the compression codec.
GetCompressionCodecType(FLAGS_log_compression_codec), &codec_),
"could not instantiate compression codec");
active_segment_sequence_number_ = sequence_number;
scoped_refptr<ReadableLogSegment> finished_segment;
RETURN_NOT_OK(AllocateSegmentAndRollOver(&finished_segment, new_readable_segment));
DCHECK(!finished_segment); // There was no previously active segment.
return Status::OK();
Status SegmentAllocator::AllocateOrRollOverIfNecessary(
uint32_t write_size_bytes,
scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
bool should_rollover = false;
// if the size of this entry overflows the current segment, get a new one
std::lock_guard<RWMutex> l(allocation_lock_);
if (allocation_state_ == kAllocationNotStarted) {
if ((active_segment_->written_offset() + write_size_bytes + 4) > max_segment_size_) {
VLOG_WITH_PREFIX(1) << "Max segment size reached. Starting new segment allocation";
if (!opts_->async_preallocate_segments) {
should_rollover = true;
} else if (allocation_state_ == kAllocationFinished) {
should_rollover = true;
} else {
VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
if (should_rollover) {
LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
RETURN_NOT_OK(RollOver(finished_segment, new_readable_segment));
return Status::OK();
Status SegmentAllocator::Sync() {
TRACE_EVENT0("log", "Sync");
SCOPED_LATENCY_METRIC(ctx_->metrics, sync_latency);
if (PREDICT_FALSE(FLAGS_log_inject_latency && !sync_disabled_)) {
Random r(GetCurrentTimeMicros());
int sleep_ms = r.Normal(FLAGS_log_inject_latency_ms_mean,
if (sleep_ms > 0) {
LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms
<< "ms of latency in SegmentAllocator::Sync()";
if (opts_->force_fsync_all && !false) {
LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Fsync log took a long time", LogPrefix())) {
if (hooks_) {
"PostSyncIfFsyncEnabled hook failed");
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PostSync(), "PostSync hook failed");
return Status::OK();
Status SegmentAllocator::FinishCurrentSegment(
scoped_refptr<ReadableLogSegment>* finished_segment) {
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
if (!footer_.has_min_replicate_index()) {
VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
<< active_segment_->path();
VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
<< ": " << pb_util::SecureShortDebugString(footer_);
// max_segment_size_ defines the (soft) limit of a segment. When preallocation
// is enabled, max_segment_size also defines the amount of space that is
// preallocated at segment creation time.
// We finish a segment when the next write would exceed max_segment_size_, at
// which point all of the segment's preallocated space has been consumed. In
// some cases (e.g. Log::Close), a segment may be finished prematurely. If we
// detect that, let's return any excess preallocated space back to the
// filesystem by truncating off the end of the segment.
if (opts_->preallocate_segments &&
active_segment_->written_offset() < max_segment_size_) {
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
if (finished_segment) {
scoped_refptr<ReadableLogSegment> segment(
new ReadableLogSegment(active_segment_->path(), active_segment_->file()));
*finished_segment = std::move(segment);
return Status::OK();
void SegmentAllocator::UpdateFooterForBatch(const LogEntryBatch& batch) {
footer_.set_num_entries(footer_.num_entries() + batch.count());
// We keep track of the last-written OpId here.
// This is needed to initialize Consensus on startup.
// We also retrieve the opid of the first operation in the batch so that, if
// we roll over to a new segment, we set the first operation in the footer
// immediately.
if (batch.type_ == REPLICATE) {
// Update the index bounds for the current segment.
for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
UpdateFooterForReplicateEntry(entry_pb, &footer_);
void SegmentAllocator::StopAllocationThread() {
Status SegmentAllocator::AllocateSegmentAndRollOver(
scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
std::lock_guard<RWMutex> l(allocation_lock_);
return RollOver(finished_segment, new_readable_segment);
void SegmentAllocator::SetSchemaForNextSegment(Schema schema,
uint32_t version) {
VLOG_WITH_PREFIX(2) << Substitute("Setting schema version $0 for next log segment $1",
version, schema.ToString());
std::lock_guard<rw_spinlock> l(schema_lock_);
schema_ = std::move(schema);
schema_version_ = version;
Status SegmentAllocator::AsyncAllocateSegmentUnlocked() {
DCHECK_EQ(kAllocationNotStarted, allocation_state_);
allocation_state_ = kAllocationInProgress;
return allocation_pool_->SubmitClosure(
Bind(&SegmentAllocator::AllocationTask, Unretained(this)));
void SegmentAllocator::AllocationTask() {
Status SegmentAllocator::AllocateNewSegment() {
TRACE_EVENT1("log", "AllocateNewSegment", "file", next_segment_path_);
CHECK_EQ(kAllocationInProgress, allocation_state());
// We must mark allocation as finished when returning from this method.
auto alloc_finished = MakeScopedCleanup([&] () {
std::lock_guard<RWMutex> l(allocation_lock_);
allocation_state_ = kAllocationFinished;
string tmp_suffix = Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
string path_tmpl = JoinPathSegments(ctx_->log_dir, tmp_suffix);
VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
unique_ptr<RWFile> segment_file;
Env* env = ctx_->fs_manager->env();
RWFileOptions(), path_tmpl, &next_segment_path_, &segment_file),
"could not create next WAL segment");
VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << next_segment_path_;
Status::IOError("Injected IOError in SegmentAllocator::AllocateNewSegment()"));
if (opts_->preallocate_segments) {
// TODO (perf) zero the new segments -- this could result in
// additional performance improvements.
0, max_segment_size_, RWFile::CHANGE_FILE_SIZE),
"could not preallocate next WAL segment");
return Status::OK();
Status SegmentAllocator::SwitchToAllocatedSegment(
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
// Increment "next" log segment seqno.
const auto& tablet_id = ctx_->tablet_id;
string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(
tablet_id, active_segment_sequence_number_);
Env* env = ctx_->fs_manager->env();
RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path),
"could not rename next WAL segment");
if (opts_->force_fsync_all) {
// Create a new segment in memory.
unique_ptr<WritableLogSegment> new_segment(
new WritableLogSegment(new_segment_path, next_segment_file_));
// Set up the new header and footer.
LogSegmentHeaderPB header;
if (codec_) {
// Set up the new footer. This will be maintained as the segment is written.
// Set the new segment's schema.
shared_lock<rw_spinlock> l(schema_lock_);
RETURN_NOT_OK(SchemaToPB(schema_, header.mutable_schema()));
RETURN_NOT_OK_PREPEND(new_segment->WriteHeader(header), "Failed to write header");
// Open the segment we just created in readable form; it is the caller's
// responsibility to add it to the reader.
// TODO(todd): consider using a global FileCache here? With short log segments and
// lots of tablets, this file descriptor usage may add up.
scoped_refptr<ReadableLogSegment> readable_segment(
new ReadableLogSegment(new_segment_path, std::move(next_segment_file_)));
RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
*new_readable_segment = std::move(readable_segment);
// Now set 'active_segment_' to the new segment.
active_segment_ = std::move(new_segment);
std::lock_guard<RWMutex> l(allocation_lock_);
allocation_state_ = kAllocationNotStarted;
return Status::OK();
Status SegmentAllocator::RollOver(
scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
SCOPED_LATENCY_METRIC(ctx_->metrics, roll_latency);
// Wait for any on-going allocations to finish.
DCHECK_EQ(kAllocationFinished, allocation_state());
// If this isn't the first active segment, close it and return a reopened
// segment reader so that the caller can update its log reader.
if (active_segment_) {
VLOG_WITH_PREFIX(1) << "Rolled over to a new log segment at "
<< active_segment_->path();
return Status::OK();
const Status Log::kLogShutdownStatus(
Status::ServiceUnavailable("WAL is shutting down", "", ESHUTDOWN));
const uint64_t Log::kInitialLogSegmentSequenceNumber = 0L;
Status Log::Open(LogOptions options,
FsManager* fs_manager,
const string& tablet_id,
Schema schema,
uint32_t schema_version,
const scoped_refptr<MetricEntity>& metric_entity,
scoped_refptr<Log>* log) {
string tablet_wal_path = fs_manager->GetTabletWalDir(tablet_id);
RETURN_NOT_OK(env_util::CreateDirIfMissing(fs_manager->env(), tablet_wal_path));
LogContext ctx({ tablet_id, std::move(tablet_wal_path) });
ctx.metric_entity = metric_entity;
ctx.metrics.reset(metric_entity ? new LogMetrics(metric_entity) : nullptr);
ctx.fs_manager = fs_manager;
scoped_refptr<Log> new_log(new Log(std::move(options), std::move(ctx), std::move(schema),
return Status::OK();
Log::Log(LogOptions options, LogContext ctx, Schema schema, uint32_t schema_version)
: options_(std::move(options)),
append_thread_(new AppendThread(this)),
segment_allocator_(&options_, &ctx_, std::move(schema), schema_version),
on_disk_size_(0) {
Status Log::Init() {
CHECK_EQ(kLogInitialized, log_state_);
// Init the index
log_index_.reset(new LogIndex(ctx_.fs_manager->env(), ctx_.log_dir));
// Reader for previous segments.
// The case where we are continuing an existing log.
// We must pick up where the previous WAL left off in terms of
// sequence numbers.
uint64_t active_seg_seq_num = 0;
if (reader_->num_segments() != 0) {
VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
<< " segments from path: " << ctx_.fs_manager->GetWalsRootDir();
vector<scoped_refptr<ReadableLogSegment> > segments;
active_seg_seq_num = segments.back()->header().sequence_number();
if (options_.force_fsync_all) {
KLOG_FIRST_N(INFO, 1) << LogPrefix() << "Log is configured to fsync() on all Append() calls";
} else {
KLOG_FIRST_N(INFO, 1) << LogPrefix()
<< "Log is configured to *not* fsync() on all Append() calls";
// We always create a new segment when the log starts.
scoped_refptr<ReadableLogSegment> new_readable_segment;
RETURN_NOT_OK(segment_allocator_.Init(active_seg_seq_num, &new_readable_segment));
log_state_ = kLogWriting;
return Status::OK();
Status Log::CreateBatchFromPB(LogEntryTypePB type,
unique_ptr<LogEntryBatchPB> entry_batch_pb,
unique_ptr<LogEntryBatch>* entry_batch) {
int num_ops = entry_batch_pb->entry_size();
unique_ptr<LogEntryBatch> new_entry_batch(new LogEntryBatch(
type, std::move(entry_batch_pb), num_ops));
TRACE("Serialized $0 byte log entry", new_entry_batch->total_size_bytes());
*entry_batch = std::move(new_entry_batch);
return Status::OK();
Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallback& callback) {
TRACE_EVENT0("log", "Log::AsyncAppend");
TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()))) {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
return kLogShutdownStatus;
return Status::OK();
Status Log::AsyncAppendReplicates(const vector<ReplicateRefPtr>& replicates,
const StatusCallback& callback) {
unique_ptr<LogEntryBatchPB> batch_pb = CreateBatchFromAllocatedOperations(replicates);
unique_ptr<LogEntryBatch> batch;
RETURN_NOT_OK(CreateBatchFromPB(REPLICATE, std::move(batch_pb), &batch));
return AsyncAppend(std::move(batch), callback);
Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg> commit_msg,
const StatusCallback& callback) {
unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
LogEntryPB* entry = batch_pb->add_entry();
unique_ptr<LogEntryBatch> entry_batch;
RETURN_NOT_OK(CreateBatchFromPB(COMMIT, std::move(batch_pb), &entry_batch));
AsyncAppend(std::move(entry_batch), callback);
return Status::OK();
Status Log::WriteBatch(LogEntryBatch* entry_batch) {
size_t num_entries = entry_batch->count();
DCHECK_GT(num_entries, 0) << "Cannot call WriteBatch() with zero entries reserved";
Status::IOError("Injected IOError in Log::WriteBatch()"));
Slice entry_batch_data = entry_batch->data();
uint32_t entry_batch_bytes = entry_batch->total_size_bytes();
// If there is no data to write return OK.
if (PREDICT_FALSE(entry_batch_bytes == 0)) {
return Status::OK();
scoped_refptr<ReadableLogSegment> finished_segment;
scoped_refptr<ReadableLogSegment> new_readable_segment;
entry_batch_bytes, &finished_segment, &new_readable_segment));
if (finished_segment) {
// Must be done before a new segment is appended.
if (new_readable_segment) {
auto* active_segment = segment_allocator_.active_segment_.get();
int64_t start_offset = active_segment->written_offset();
LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Append to log took a long time", LogPrefix())) {
SCOPED_LATENCY_METRIC(ctx_.metrics, append_latency);
RETURN_NOT_OK(active_segment->WriteEntryBatch(entry_batch_data, segment_allocator_.codec_));
// Update the reader on how far it can read the active segment.
if (segment_allocator_.hooks_) {
if (ctx_.metrics) {
CHECK_OK(UpdateIndexForBatch(*entry_batch, start_offset));
return Status::OK();
Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
int64_t start_offset) {
if (batch.type_ != REPLICATE) {
return Status::OK();
for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
LogIndexEntry index_entry;
index_entry.op_id = entry_pb.replicate().id();
index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number();
index_entry.offset_in_segment = start_offset;
return Status::OK();
Status Log::AllocateSegmentAndRollOverForTests() {
std::lock_guard<rw_spinlock> l(segment_idle_lock_);
scoped_refptr<ReadableLogSegment> finished_segment;
scoped_refptr<ReadableLogSegment> new_readable_segment;
&finished_segment, &new_readable_segment));
if (finished_segment) {
return Status::OK();
Status Log::Sync() {
return segment_allocator_.Sync();
int GetPrefixSizeToGC(RetentionIndexes retention_indexes, const SegmentSequence& segments) {
int rem_segs = segments.size();
int prefix_size = 0;
for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
if (rem_segs <= FLAGS_log_min_segments_to_retain) {
if (!segment->HasFooter()) break;
int64_t seg_max_idx = segment->footer().max_replicate_index();
// If removing this segment would compromise durability, we cannot remove it.
if (seg_max_idx >= retention_indexes.for_durability) {
// Check if removing this segment would compromise the ability to catch up a peer,
// we should retain it, unless this would break the max_segments flag.
if (seg_max_idx >= retention_indexes.for_peers &&
rem_segs <= FLAGS_log_max_segments_to_retain) {
return prefix_size;
void Log::GetSegmentsToGCUnlocked(RetentionIndexes retention_indexes,
SegmentSequence* segments_to_gc) const {
segments_to_gc->resize(GetPrefixSizeToGC(retention_indexes, *segments_to_gc));
Status Log::Append(LogEntryPB* entry) {
unique_ptr<LogEntryBatchPB> entry_batch_pb(new LogEntryBatchPB);
LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1);
Status s = WriteBatch(&entry_batch);
if (s.ok()) {
s = Sync();
entry_batch.entry_batch_pb_->mutable_entry()->ExtractSubrange(0, 1, nullptr);
return s;
Status Log::WaitUntilAllFlushed() {
// In order to make sure we empty the queue we need to use
// the async api.
unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
unique_ptr<LogEntryBatch> reserved_entry_batch;
RETURN_NOT_OK(CreateBatchFromPB(FLUSH_MARKER, std::move(entry_batch), &reserved_entry_batch));
Synchronizer s;
AsyncAppend(std::move(reserved_entry_batch), s.AsStatusCallback());
return s.Wait();
Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
CHECK_GE(retention_indexes.for_durability, 0);
VLOG_WITH_PREFIX(1) << "Running Log GC on " << ctx_.log_dir << ": retaining "
"ops >= " << retention_indexes.for_durability << " for durability, "
"ops >= " << retention_indexes.for_peers << " for peers";
VLOG_TIMING(1, Substitute("$0Log GC", LogPrefix())) {
SegmentSequence segments_to_delete;
std::lock_guard<percpu_rwlock> l(state_lock_);
CHECK_EQ(kLogWriting, log_state_);
GetSegmentsToGCUnlocked(retention_indexes, &segments_to_delete);
if (segments_to_delete.empty()) {
VLOG_WITH_PREFIX(1) << "No segments to delete.";
*num_gced = 0;
return Status::OK();
// Trim the prefix of segments from the reader so that they are no longer
// referenced by the log.
segments_to_delete[segments_to_delete.size() - 1]->header().sequence_number());
// Now that they are no longer referenced by the Log, delete the files.
*num_gced = 0;
for (const scoped_refptr<ReadableLogSegment>& segment : segments_to_delete) {
string ops_str;
if (segment->HasFooter() && segment->footer().has_min_replicate_index()) {
ops_str = Substitute(" (ops $0-$1)",
LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path() << ops_str;
// Determine the minimum remaining replicate index in order to properly GC
// the index chunks.
int64_t min_remaining_op_idx = reader_->GetMinReplicateIndex();
if (min_remaining_op_idx > 0) {
return Status::OK();
int64_t Log::GetGCableDataSize(RetentionIndexes retention_indexes) const {
CHECK_GE(retention_indexes.for_durability, 0);
SegmentSequence segments_to_delete;
shared_lock<rw_spinlock> l(state_lock_.get_lock());
CHECK_EQ(kLogWriting, log_state_);
GetSegmentsToGCUnlocked(retention_indexes, &segments_to_delete);
int64_t total_size = 0;
for (const auto& segment : segments_to_delete) {
total_size += segment->file_size();
return total_size;
void Log::GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size) const {
SegmentSequence segments;
shared_lock<rw_spinlock> l(state_lock_.get_lock());
CHECK_EQ(kLogWriting, log_state_);
int64_t cumulative_size = 0;
for (const auto& segment : boost::adaptors::reverse(segments)) {
if (!segment->HasFooter()) continue;
cumulative_size += segment->file_size();
int64_t max_repl_idx = segment->footer().max_replicate_index();
(*replay_size)[max_repl_idx] = cumulative_size;
int64_t Log::OnDiskSize() {
SegmentSequence segments;
shared_lock<rw_spinlock> l(state_lock_.get_lock());
// If the log is closed, the tablet is either being deleted or tombstoned,
// so we don't count the size of its log anymore as it should be deleted.
if (log_state_ == kLogClosed) {
return on_disk_size_.load();
int64_t ret = 0;
for (const auto& segment : segments) {
ret += segment->file_size();
}, std::memory_order_relaxed);
return ret;
void Log::SetSchemaForNextLogSegment(Schema schema,
uint32_t version) {
segment_allocator_.SetSchemaForNextSegment(std::move(schema), version);
Status Log::Close() {
std::lock_guard<percpu_rwlock> l(state_lock_);
switch (log_state_) {
case kLogWriting:
log_state_ = kLogClosed;
case kLogClosed:
VLOG_WITH_PREFIX(1) << "Log already closed";
return Status::OK();
return Status::IllegalState(Substitute(
"Log not open. State: $0", log_state_));
/*finished_segment=*/ nullptr));
VLOG_WITH_PREFIX(1) << "Log closed";
// Release FDs held by these objects.
return Status::OK();
bool Log::HasOnDiskData(FsManager* fs_manager, const string& tablet_id) {
const string wal_dir = fs_manager->GetTabletWalDir(tablet_id);
return fs_manager->env()->FileExists(wal_dir);
Status Log::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) {
string wal_dir = fs_manager->GetTabletWalDir(tablet_id);
Env* env = fs_manager->env();
if (!env->FileExists(wal_dir)) {
return Status::OK();
LOG(INFO) << Substitute("T $0 P $1: Deleting WAL directory at $2",
tablet_id, fs_manager->uuid(), wal_dir);
"Unable to recursively delete WAL dir for tablet " + tablet_id);
return Status::OK();
Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& tablet_id) {
string recovery_path = fs_manager->GetTabletWalRecoveryDir(tablet_id);
const auto kLogPrefix = Substitute("T $0 P $1: ", tablet_id, fs_manager->uuid());
if (!fs_manager->Exists(recovery_path)) {
VLOG(1) << kLogPrefix << "Tablet WAL recovery dir " << recovery_path <<
" does not exist.";
return Status::OK();
VLOG(1) << kLogPrefix << "Preparing to delete log recovery files and directory " << recovery_path;
string tmp_path = Substitute("$0-$1", recovery_path, GetCurrentTimeMicros());
VLOG(1) << kLogPrefix << "Renaming log recovery dir from " << recovery_path
<< " to " << tmp_path;
RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(recovery_path, tmp_path),
Substitute("Could not rename old recovery dir from: $0 to: $1",
recovery_path, tmp_path));
if (FLAGS_skip_remove_old_recovery_dir) {
LOG(INFO) << kLogPrefix << "--skip_remove_old_recovery_dir enabled. NOT deleting " << tmp_path;
return Status::OK();
VLOG(1) << kLogPrefix << "Deleting all files from renamed log recovery directory " << tmp_path;
"Could not remove renamed recovery dir " + tmp_path);
VLOG(1) << kLogPrefix << "Completed deletion of old log recovery files and directory "
<< tmp_path;
return Status::OK();
std::string Log::LogPrefix() const { return ctx_.LogPrefix(); }
Log::~Log() {
WARN_NOT_OK(Close(), "Error closing log");
LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
unique_ptr<LogEntryBatchPB> entry_batch_pb,
size_t count)
: type_(type),
PREDICT_FALSE(count == 1 && entry_batch_pb_->entry(0).type() == FLUSH_MARKER) ?
0 : entry_batch_pb_->ByteSize()),
count_(count) {
LogEntryBatch::~LogEntryBatch() {
if (type_ == REPLICATE && entry_batch_pb_) {
for (LogEntryPB& entry : *entry_batch_pb_->mutable_entry()) {
// ReplicateMsg elements are owned by and must be freed by the caller
// (e.g. the LogCache).
void LogEntryBatch::Serialize() {
DCHECK_EQ(buffer_.size(), 0);
// FLUSH_MARKER LogEntries are markers and are not serialized.
if (PREDICT_FALSE(count() == 1 && entry_batch_pb_->entry(0).type() == FLUSH_MARKER)) {
pb_util::AppendToString(*entry_batch_pb_, &buffer_);
} // namespace log
} // namespace kudu