blob: e94b36c10c6dc53e34cc1217c2f9e68e63ac7272 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/log.h"
#include <cerrno>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
#include <utility>
#include <boost/range/adaptor/reversed.hpp>
#include <gflags/gflags.h>
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_metrics.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/async_util.h"
#include "kudu/util/compression/compression_codec.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/file_cache.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
// Log retention configuration.
// -----------------------------
DEFINE_int32(log_min_segments_to_retain, 1,
"The minimum number of past log segments to keep at all times,"
" regardless of what is required for durability. "
"Must be at least 1.");
TAG_FLAG(log_min_segments_to_retain, runtime);
TAG_FLAG(log_min_segments_to_retain, advanced);
DEFINE_int32(log_max_segments_to_retain, 80,
"The maximum number of past log segments to keep at all times for "
"the purposes of catching up other peers.");
TAG_FLAG(log_max_segments_to_retain, runtime);
TAG_FLAG(log_max_segments_to_retain, advanced);
TAG_FLAG(log_max_segments_to_retain, experimental);
// Group commit configuration.
// -----------------------------
DEFINE_int32(group_commit_queue_size_bytes, 4 * 1024 * 1024,
"Maximum size of the group commit queue in bytes");
TAG_FLAG(group_commit_queue_size_bytes, advanced);
DEFINE_int32(log_thread_idle_threshold_ms, 1000,
"Number of milliseconds after which the log append thread decides that a "
"log is idle, and considers shutting down. Used by tests.");
TAG_FLAG(log_thread_idle_threshold_ms, experimental);
TAG_FLAG(log_thread_idle_threshold_ms, hidden);
// Compression configuration.
// -----------------------------
DEFINE_string(log_compression_codec, "LZ4",
"Codec to use for compressing WAL segments.");
TAG_FLAG(log_compression_codec, experimental);
// Fault/latency injection flags.
// -----------------------------
DEFINE_bool(log_inject_latency, false,
"If true, injects artificial latency in log sync operations. "
"Advanced option. Use at your own risk -- has a negative effect "
"on performance for obvious reasons!");
TAG_FLAG(log_inject_latency, unsafe);
TAG_FLAG(log_inject_latency, runtime);
DEFINE_int32(log_inject_latency_ms_mean, 100,
"The number of milliseconds of latency to inject, on average. "
"Only takes effect if --log_inject_latency is true");
TAG_FLAG(log_inject_latency_ms_mean, unsafe);
TAG_FLAG(log_inject_latency_ms_mean, runtime);
DEFINE_int32(log_inject_latency_ms_stddev, 100,
"The standard deviation of latency to inject in the log. "
"Only takes effect if --log_inject_latency is true");
TAG_FLAG(log_inject_latency_ms_stddev, unsafe);
TAG_FLAG(log_inject_latency_ms_stddev, runtime);
DEFINE_int32(log_inject_thread_lifecycle_latency_ms, 0,
"Injection point for random latency during key thread lifecycle transition "
"points.");
TAG_FLAG(log_inject_thread_lifecycle_latency_ms, unsafe);
TAG_FLAG(log_inject_thread_lifecycle_latency_ms, runtime);
DEFINE_double(fault_crash_before_append_commit, 0.0,
"Fraction of the time when the server will crash just before appending a "
"COMMIT message to the log. (For testing only!)");
TAG_FLAG(fault_crash_before_append_commit, unsafe);
TAG_FLAG(fault_crash_before_append_commit, runtime);
DEFINE_double(log_inject_io_error_on_append_fraction, 0.0,
"Fraction of the time when the log will fail to append and return an IOError. "
"(For testing only!)");
TAG_FLAG(log_inject_io_error_on_append_fraction, unsafe);
TAG_FLAG(log_inject_io_error_on_append_fraction, runtime);
DEFINE_double(log_inject_io_error_on_preallocate_fraction, 0.0,
"Fraction of the time when the log will fail to preallocate and return an IOError. "
"(For testing only!)");
TAG_FLAG(log_inject_io_error_on_preallocate_fraction, unsafe);
TAG_FLAG(log_inject_io_error_on_preallocate_fraction, runtime);
// Other flags.
// -----------------------------
DEFINE_int64(fs_wal_dir_reserved_bytes, -1,
"Number of bytes to reserve on the log directory filesystem for "
"non-Kudu usage. The default, which is represented by -1, is that "
"1% of the disk space on each disk will be reserved. Any other "
"value specified represents the number of bytes reserved and must "
"be greater than or equal to 0. Explicit percentages to reserve "
"are not currently supported");
DEFINE_validator(fs_wal_dir_reserved_bytes, [](const char* /*n*/, int64_t v) { return v >= -1; });
TAG_FLAG(fs_wal_dir_reserved_bytes, runtime);
DEFINE_bool(fs_wal_use_file_cache, true,
"Whether to use the server-wide file cache for WAL segments and "
"WAL index chunks.");
TAG_FLAG(fs_wal_use_file_cache, runtime);
TAG_FLAG(fs_wal_use_file_cache, advanced);
DEFINE_bool(skip_remove_old_recovery_dir, false,
"Skip removing WAL recovery dir after startup. (useful for debugging)");
TAG_FLAG(skip_remove_old_recovery_dir, hidden);
// Validate that log_min_segments_to_retain >= 1
static bool ValidateLogsToRetain(const char* flagname, int value) {
if (value >= 1) {
return true;
}
LOG(ERROR) << strings::Substitute("$0 must be at least 1, value $1 is invalid",
flagname, value);
return false;
}
DEFINE_validator(log_min_segments_to_retain, &ValidateLogsToRetain);
using kudu::consensus::CommitMsg;
using kudu::consensus::OpId;
using kudu::consensus::ReplicateRefPtr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace log {
string LogContext::LogPrefix() const {
return Substitute("T $0 P $1: ", tablet_id, fs_manager->uuid());
}
// Manages the thread which drains groups of batches from the log's queue and
// appends them to the underlying log instance.
//
// Rather than being a long-running thread, this instead uses a threadpool with
// size 1 to automatically start and stop a thread on demand. When the log
// is idle for some amount of time, no task will be on the thread pool, and thus
// the underlying thread may exit.
//
// The design of submitting tasks to the threadpool is slightly tricky in order
// to achieve group commit and not have to submit one task per appended batch.
// Instead, a generic 'ProcessQueue()' task is used which loops collecting
// batches to write until it finds that the queue has been empty for a while,
// at which point the task finishes.
//
// The trick, then, lies in two areas:
//
// 1) After adding a batch to the queue, we need to ensure that a task is
// already running, and if not, start one. This is done in Wake().
//
// 2) When the task finds no more batches to write and wants to go idle, it
// needs to ensure that it doesn't miss a concurrent additions to the queue.
// This is done in GoIdle().
//
// See the implementation comments in Wake() and GoIdle() for details.
class Log::AppendThread {
public:
explicit AppendThread(Log* log);
// Initializes the objects and starts the thread pool.
Status Init();
// Waits until the last enqueued elements are processed, sets the
// Appender thread to closing state. If any entries are added to the
// queue during the process, invoke their callbacks' 'OnFailure()'
// method.
void Shutdown();
// Wake up the appender task, if it is not already running.
// This should be called after each time that a new entry is
// appended to the log's queue.
void Wake();
bool active() const {
return base::subtle::NoBarrier_Load(&thread_state_) == ACTIVE;
}
private:
// The task submitted to the threadpool which collects batches from the queue
// and appends them, until it determines that the queue is idle.
void ProcessQueue();
// Tries to transition back to IDLE state. If successful, returns true.
//
// Otherwise, returns false to indicate that the task should keep running because
// a new task was enqueued just as we were trying to go idle.
bool GoIdle();
// Handle the actual appending of a group of entries.
void HandleBatches(vector<unique_ptr<LogEntryBatch>> entry_batches);
string LogPrefix() const;
Log* const log_;
// Atomic state machine for whether there is any task currently queued or
// running on append_pool_. See Wake() and GoIdle() for more details.
enum ThreadState {
// No task is queued or running.
IDLE,
// A task is queued or running.
ACTIVE
};
Atomic32 thread_state_ = IDLE;
// Pool with a single thread, which handles shutting down the thread
// when idle.
unique_ptr<ThreadPool> append_pool_;
};
Log::AppendThread::AppendThread(Log *log)
: log_(log) {
}
Status Log::AppendThread::Init() {
DCHECK(!append_pool_) << "Already initialized";
VLOG_WITH_PREFIX(1) << "Starting log append thread";
RETURN_NOT_OK(ThreadPoolBuilder("wal-append")
.set_min_threads(0)
// Only need one thread since we'll only schedule one
// task at a time.
.set_max_threads(1)
// No need for keeping idle threads, since the task itself
// handles waiting for work while idle.
.set_idle_timeout(MonoDelta::FromSeconds(0))
.Build(&append_pool_));
return Status::OK();
}
void Log::AppendThread::Wake() {
DCHECK(append_pool_);
auto old_status = base::subtle::NoBarrier_CompareAndSwap(
&thread_state_, IDLE, ACTIVE);
if (old_status == IDLE) {
CHECK_OK(append_pool_->Submit([this]() { this->ProcessQueue(); }));
}
}
void Log::SetActiveSegmentIdle() {
std::lock_guard<rw_spinlock> l(segment_idle_lock_);
segment_allocator_.active_segment_->GoIdle();
}
bool Log::AppendThread::GoIdle() {
// Inject latency at key points in this function for the purposes of tests.
MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
// Stopping is a bit tricky. We have to consider the following race:
//
// T1 AppendThread
// ------------ -------------
// - state is TRIGGERED
// - BlockingDrainTo returns TimedOut()
// - queue.Put()
// - Wake() no-op because
// it's already triggered
// So, we first transition back to STOPPED state, and then re-check to see
// if there has been something enqueued in the meantime.
auto old_state = base::subtle::NoBarrier_AtomicExchange(&thread_state_, IDLE);
DCHECK_EQ(old_state, ACTIVE);
if (log_->entry_queue()->empty()) {
// Nothing got enqueued, which means there must not have been any missed wakeup.
// We are now in IDLE state.
return true;
}
MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
// Someone enqueued something. We don't know whether their wakeup was successful
// or not, but we can just try to transition back to ACTIVE mode here.
if (base::subtle::NoBarrier_CompareAndSwap(&thread_state_, IDLE, ACTIVE)
== IDLE) {
// Their wake-up was lost, but we've now marked ourselves as running.
MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
return false;
}
// Their wake-up was successful, meaning that there is another task on the
// queue behind us now, so we can exit this one.
MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
return true;
}
void Log::AppendThread::ProcessQueue() {
DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(thread_state_), ACTIVE);
VLOG_WITH_PREFIX(2) << "WAL Appender going active";
while (true) {
MonoTime deadline = MonoTime::Now() +
MonoDelta::FromMilliseconds(FLAGS_log_thread_idle_threshold_ms);
vector<unique_ptr<LogEntryBatch>> entry_batches;
Status s = log_->entry_queue()->BlockingDrainTo(&entry_batches, deadline);
if (PREDICT_FALSE(s.IsAborted())) {
break;
} else if (PREDICT_FALSE(s.IsTimedOut())) {
if (GoIdle()) break;
continue;
}
HandleBatches(std::move(entry_batches));
}
log_->SetActiveSegmentIdle();
VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
}
void Log::AppendThread::HandleBatches(vector<unique_ptr<LogEntryBatch>> entry_batches) {
if (log_->ctx_.metrics) {
log_->ctx_.metrics->entry_batches_per_group->Increment(entry_batches.size());
}
TRACE_EVENT1("log", "batch", "batch_size", entry_batches.size());
SCOPED_LATENCY_METRIC(log_->ctx_.metrics, group_commit_latency);
bool is_all_commits = true;
for (auto& entry_batch : entry_batches) {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
Status s = log_->WriteBatch(entry_batch.get());
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
// TODO(af): If a single op fails to append, should we
// abort all subsequent ops in this batch or allow
// them to be appended? What about ops in future
// batches?
entry_batch->SetAppendError(s);
}
if (is_all_commits && entry_batch->type_ != COMMIT) {
is_all_commits = false;
}
}
Status s;
if (!is_all_commits) {
s = log_->Sync();
}
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
for (const auto& entry_batch : entry_batches) {
entry_batch->SetAppendError(s);
}
} else {
VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
}
TRACE_EVENT0("log", "Callbacks");
SCOPED_WATCH_STACK(100);
for (auto& entry_batch : entry_batches) {
entry_batch->RunCallback();
// It's important to delete each batch as we see it, because
// deleting it may free up memory from memory trackers, and the
// callback of a later batch may want to use that memory.
entry_batch.reset();
}
}
void Log::AppendThread::Shutdown() {
log_->entry_queue()->Shutdown();
if (append_pool_) {
append_pool_->Wait();
append_pool_->Shutdown();
}
}
string Log::AppendThread::LogPrefix() const {
return log_->LogPrefix();
}
// Return true if the append thread is currently active.
bool Log::append_thread_active_for_tests() const {
return append_thread_->active();
}
SegmentAllocator::SegmentAllocator(const LogOptions* opts,
const LogContext* ctx,
Schema schema,
uint32_t schema_version)
: opts_(opts),
ctx_(ctx),
max_segment_size_(opts_->segment_size_mb * 1024 * 1024),
schema_(std::move(schema)),
schema_version_(schema_version),
sync_disabled_(false) {}
Status SegmentAllocator::Init(
uint64_t sequence_number,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
// Init the compression codec.
RETURN_NOT_OK_PREPEND(GetCompressionCodec(
GetCompressionCodecType(FLAGS_log_compression_codec), &codec_),
"could not instantiate compression codec");
active_segment_sequence_number_ = sequence_number;
RETURN_NOT_OK(ThreadPoolBuilder("log-alloc")
.set_max_threads(1)
.Build(&allocation_pool_));
scoped_refptr<ReadableLogSegment> finished_segment;
RETURN_NOT_OK(AllocateSegmentAndRollOver(&finished_segment, new_readable_segment));
DCHECK(!finished_segment); // There was no previously active segment.
return Status::OK();
}
Status SegmentAllocator::AllocateOrRollOverIfNecessary(
uint32_t write_size_bytes,
scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
bool should_rollover = false;
// if the size of this entry overflows the current segment, get a new one
{
std::lock_guard<RWMutex> l(allocation_lock_);
if (allocation_state_ == kAllocationNotStarted) {
if ((active_segment_->written_offset() + write_size_bytes + 4) > max_segment_size_) {
VLOG_WITH_PREFIX(1) << "Max segment size reached. Starting new segment allocation";
RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
if (!opts_->async_preallocate_segments) {
should_rollover = true;
}
}
} else if (allocation_state_ == kAllocationFinished) {
should_rollover = true;
} else {
DCHECK(opts_->async_preallocate_segments);
VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
}
}
if (should_rollover) {
TRACE_COUNTER_SCOPE_LATENCY_US("log_roll");
LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
RETURN_NOT_OK(RollOver(finished_segment, new_readable_segment));
}
}
return Status::OK();
}
Status SegmentAllocator::Sync() {
TRACE_EVENT0("log", "Sync");
SCOPED_LATENCY_METRIC(ctx_->metrics, sync_latency);
if (PREDICT_FALSE(FLAGS_log_inject_latency && !sync_disabled_)) {
Random r(GetCurrentTimeMicros());
int sleep_ms = r.Normal(FLAGS_log_inject_latency_ms_mean,
FLAGS_log_inject_latency_ms_stddev);
if (sleep_ms > 0) {
LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms
<< "ms of latency in SegmentAllocator::Sync()";
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
}
}
if (opts_->force_fsync_all) {
LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Fsync log took a long time", LogPrefix())) {
RETURN_NOT_OK(active_segment_->Sync());
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PostSyncIfFsyncEnabled(),
"PostSyncIfFsyncEnabled hook failed");
}
}
}
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PostSync(), "PostSync hook failed");
}
return Status::OK();
}
Status SegmentAllocator::FinishCurrentSegment(
scoped_refptr<ReadableLogSegment>* finished_segment) {
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
}
if (!footer_.has_min_replicate_index()) {
VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
<< active_segment_->path();
}
VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
<< ": " << pb_util::SecureShortDebugString(footer_);
footer_.set_close_timestamp_micros(GetCurrentTimeMicros());
RETURN_NOT_OK(active_segment_->WriteFooter(footer_));
// max_segment_size_ defines the (soft) limit of a segment. When preallocation
// is enabled, max_segment_size also defines the amount of space that is
// preallocated at segment creation time.
//
// We finish a segment when the next write would exceed max_segment_size_, at
// which point all of the segment's preallocated space has been consumed. In
// some cases (e.g. Log::Close), a segment may be finished prematurely. If we
// detect that, let's return any excess preallocated space back to the
// filesystem by truncating off the end of the segment.
if (opts_->preallocate_segments &&
active_segment_->written_offset() < max_segment_size_) {
RETURN_NOT_OK(active_segment_->file()->Truncate(
active_segment_->written_offset()));
}
RETURN_NOT_OK(Sync());
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
}
if (finished_segment) {
scoped_refptr<ReadableLogSegment> segment(
new ReadableLogSegment(active_segment_->path(), active_segment_->file()));
RETURN_NOT_OK(segment->Init(active_segment_->header(),
active_segment_->footer(),
active_segment_->first_entry_offset()));
*finished_segment = std::move(segment);
}
return Status::OK();
}
void SegmentAllocator::UpdateFooterForBatch(const LogEntryBatch& batch) {
footer_.set_num_entries(footer_.num_entries() + batch.count());
// We keep track of the last-written OpId here.
// This is needed to initialize Consensus on startup.
// We also retrieve the opid of the first operation in the batch so that, if
// we roll over to a new segment, we set the first operation in the footer
// immediately.
if (batch.type_ == REPLICATE) {
// Update the index bounds for the current segment.
for (const OpId& op_id : batch.replicate_op_ids_) {
UpdateFooterForReplicateEntry(op_id, &footer_);
}
}
}
void SegmentAllocator::StopAllocationThread() {
allocation_pool_->Shutdown();
}
Status SegmentAllocator::AllocateSegmentAndRollOver(
scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
{
std::lock_guard<RWMutex> l(allocation_lock_);
RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
}
return RollOver(finished_segment, new_readable_segment);
}
void SegmentAllocator::SetSchemaForNextSegment(Schema schema,
uint32_t version) {
VLOG_WITH_PREFIX(2) << Substitute("Setting schema version $0 for next log segment $1",
version, schema.ToString());
std::lock_guard<rw_spinlock> l(schema_lock_);
schema_ = std::move(schema);
schema_version_ = version;
}
Status SegmentAllocator::AsyncAllocateSegmentUnlocked() {
allocation_lock_.AssertAcquiredForWriting();
DCHECK_EQ(kAllocationNotStarted, allocation_state_);
allocation_status_.Reset();
allocation_state_ = kAllocationInProgress;
return allocation_pool_->Submit([this]() { this->AllocationTask(); });
}
void SegmentAllocator::AllocationTask() {
allocation_status_.Set(AllocateNewSegment());
}
Status SegmentAllocator::AllocateNewSegment() {
TRACE_EVENT1("log", "AllocateNewSegment", "file", next_segment_path_);
CHECK_EQ(kAllocationInProgress, allocation_state());
// We must mark allocation as finished when returning from this method.
auto alloc_finished = MakeScopedCleanup([&] () {
std::lock_guard<RWMutex> l(allocation_lock_);
allocation_state_ = kAllocationFinished;
});
// We could create the new segment file through the cache, but that's tricky
// because of the file rename that'll happen later. So instead, we'll create
// it outside the cache now, then reopen via the cache when we switch to it.
string tmp_suffix = Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
string path_tmpl = JoinPathSegments(ctx_->log_dir, tmp_suffix);
VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
unique_ptr<RWFile> segment_file;
Env* env = ctx_->fs_manager->env();
RETURN_NOT_OK_PREPEND(env->NewTempRWFile(
RWFileOptions(), path_tmpl, &next_segment_path_, &segment_file),
"could not create next WAL segment");
next_segment_file_.reset(segment_file.release());
VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << next_segment_path_;
MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_preallocate_fraction,
Status::IOError("Injected IOError in SegmentAllocator::AllocateNewSegment()"));
if (opts_->preallocate_segments) {
RETURN_NOT_OK(env_util::VerifySufficientDiskSpace(env,
next_segment_path_,
max_segment_size_,
FLAGS_fs_wal_dir_reserved_bytes));
// TODO (perf) zero the new segments -- this could result in
// additional performance improvements.
RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(
0, max_segment_size_, RWFile::CHANGE_FILE_SIZE),
"could not preallocate next WAL segment");
}
return Status::OK();
}
Status SegmentAllocator::SwitchToAllocatedSegment(
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
// Increment "next" log segment seqno.
active_segment_sequence_number_++;
const auto& tablet_id = ctx_->tablet_id;
string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(
tablet_id, active_segment_sequence_number_);
Env* env = ctx_->fs_manager->env();
RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path),
"could not rename next WAL segment");
if (opts_->force_fsync_all) {
RETURN_NOT_OK(env->SyncDir(ctx_->log_dir));
}
// Reopen the allocated segment file thru the file cache.
if (PREDICT_TRUE(ctx_->file_cache && FLAGS_fs_wal_use_file_cache)) {
RETURN_NOT_OK(ctx_->file_cache->OpenFile<Env::MUST_EXIST>(
new_segment_path, &next_segment_file_));
}
// Create a new segment in memory.
unique_ptr<WritableLogSegment> new_segment(
new WritableLogSegment(new_segment_path, next_segment_file_));
// Set up the new header and footer.
LogSegmentHeaderPB header;
header.set_sequence_number(active_segment_sequence_number_);
header.set_tablet_id(tablet_id);
if (codec_) {
header.set_compression_codec(codec_->type());
}
// Set up the new footer. This will be maintained as the segment is written.
footer_.Clear();
footer_.set_num_entries(0);
// Set the new segment's schema.
{
shared_lock<rw_spinlock> l(schema_lock_);
RETURN_NOT_OK(SchemaToPB(schema_, header.mutable_schema()));
header.set_schema_version(schema_version_);
}
RETURN_NOT_OK_PREPEND(new_segment->WriteHeader(header), "Failed to write header");
// Open the segment we just created in readable form; it is the caller's
// responsibility to add it to the reader.
{
scoped_refptr<ReadableLogSegment> readable_segment(
new ReadableLogSegment(new_segment_path, std::move(next_segment_file_)));
RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
*new_readable_segment = std::move(readable_segment);
}
// Now set 'active_segment_' to the new segment.
active_segment_ = std::move(new_segment);
std::lock_guard<RWMutex> l(allocation_lock_);
allocation_state_ = kAllocationNotStarted;
return Status::OK();
}
Status SegmentAllocator::RollOver(
scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
SCOPED_LATENCY_METRIC(ctx_->metrics, roll_latency);
// Wait for any on-going allocations to finish.
RETURN_NOT_OK(allocation_status_.Get());
DCHECK_EQ(kAllocationFinished, allocation_state());
// If this isn't the first active segment, close it and return a reopened
// segment reader so that the caller can update its log reader.
if (active_segment_) {
RETURN_NOT_OK(FinishCurrentSegment(finished_segment));
}
RETURN_NOT_OK(SwitchToAllocatedSegment(new_readable_segment));
VLOG_WITH_PREFIX(1) << "Rolled over to a new log segment at "
<< active_segment_->path();
return Status::OK();
}
const Status Log::kLogShutdownStatus(
Status::ServiceUnavailable("WAL is shutting down", "", ESHUTDOWN));
const uint64_t Log::kInitialLogSegmentSequenceNumber = 0L;
Status Log::Open(LogOptions options,
FsManager* fs_manager,
FileCache* file_cache,
const string& tablet_id,
Schema schema,
uint32_t schema_version,
const scoped_refptr<MetricEntity>& metric_entity,
scoped_refptr<Log>* log) {
string tablet_wal_path = fs_manager->GetTabletWalDir(tablet_id);
RETURN_NOT_OK(env_util::CreateDirIfMissing(fs_manager->env(), tablet_wal_path));
LogContext ctx({ tablet_id, std::move(tablet_wal_path) });
ctx.metric_entity = metric_entity;
ctx.metrics.reset(metric_entity ? new LogMetrics(metric_entity) : nullptr);
ctx.fs_manager = fs_manager;
ctx.file_cache = file_cache;
scoped_refptr<Log> new_log(new Log(std::move(options), std::move(ctx), std::move(schema),
schema_version));
RETURN_NOT_OK(new_log->Init());
log->swap(new_log);
return Status::OK();
}
Log::Log(LogOptions options, LogContext ctx, Schema schema, uint32_t schema_version)
: options_(std::move(options)),
ctx_(std::move(ctx)),
log_state_(kLogInitialized),
entry_batch_queue_(FLAGS_group_commit_queue_size_bytes),
append_thread_(new AppendThread(this)),
segment_allocator_(&options_, &ctx_, std::move(schema), schema_version),
on_disk_size_(0) {
}
Status Log::Init() {
CHECK_EQ(kLogInitialized, log_state_);
// Init the index.
log_index_.reset(new LogIndex(ctx_.fs_manager->env(),
ctx_.file_cache,
ctx_.log_dir));
// Reader for previous segments.
RETURN_NOT_OK(LogReader::Open(ctx_.fs_manager,
log_index_,
ctx_.tablet_id,
ctx_.metric_entity.get(),
ctx_.file_cache,
&reader_));
// The case where we are continuing an existing log.
// We must pick up where the previous WAL left off in terms of
// sequence numbers.
uint64_t active_seg_seq_num = 0;
if (reader_->num_segments() != 0) {
VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
<< " segments from path: " << ctx_.fs_manager->GetWalsRootDir();
vector<scoped_refptr<ReadableLogSegment> > segments;
reader_->GetSegmentsSnapshot(&segments);
active_seg_seq_num = segments.back()->header().sequence_number();
}
if (options_.force_fsync_all) {
KLOG_FIRST_N(INFO, 1) << LogPrefix() << "Log is configured to fsync() on all Append() calls";
} else {
KLOG_FIRST_N(INFO, 1) << LogPrefix()
<< "Log is configured to *not* fsync() on all Append() calls";
}
// We always create a new segment when the log starts.
scoped_refptr<ReadableLogSegment> new_readable_segment;
RETURN_NOT_OK(segment_allocator_.Init(active_seg_seq_num, &new_readable_segment));
reader_->AppendEmptySegment(std::move(new_readable_segment));
RETURN_NOT_OK(append_thread_->Init());
log_state_ = kLogWriting;
return Status::OK();
}
unique_ptr<LogEntryBatch> Log::CreateBatchFromPB(
LogEntryTypePB type, const LogEntryBatchPB& entry_batch_pb,
StatusCallback cb) {
unique_ptr<LogEntryBatch> new_entry_batch(
new LogEntryBatch(type, entry_batch_pb, std::move(cb)));
TRACE("Serialized $0 byte log entry", new_entry_batch->total_size_bytes());
return new_entry_batch;
}
Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch) {
TRACE_EVENT0("log", "Log::AsyncAppend");
// entry_batch_trace_id is used the identifier for the trace, where only the
// address is stored, the pointer isn't de-referenced.
const LogEntryBatch* entry_batch_trace_id = entry_batch.get();
TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch_trace_id);
if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(std::move(entry_batch)).ok())) {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch_trace_id);
return kLogShutdownStatus;
}
append_thread_->Wake();
return Status::OK();
}
Status Log::AsyncAppendReplicates(vector<ReplicateRefPtr> replicates,
StatusCallback callback) {
LogEntryBatchPB batch_pb;
batch_pb.mutable_entry()->Reserve(replicates.size());
for (const auto& r : replicates) {
LogEntryPB* entry_pb = batch_pb.add_entry();
entry_pb->set_type(REPLICATE);
entry_pb->set_allocated_replicate(r->get());
}
unique_ptr<LogEntryBatch> batch =
CreateBatchFromPB(REPLICATE, batch_pb, std::move(callback));
for (LogEntryPB& entry : *batch_pb.mutable_entry()) {
entry.release_replicate();
}
return AsyncAppend(std::move(batch));
}
Status Log::AsyncAppendCommit(const consensus::CommitMsg& commit_msg,
StatusCallback callback) {
MAYBE_FAULT(FLAGS_fault_crash_before_append_commit);
LogEntryBatchPB batch_pb;
LogEntryPB* entry = batch_pb.add_entry();
entry->set_type(COMMIT);
entry->unsafe_arena_set_allocated_commit(const_cast<consensus::CommitMsg*>(&commit_msg));
unique_ptr<LogEntryBatch> entry_batch = CreateBatchFromPB(
COMMIT, batch_pb, std::move(callback));
entry->unsafe_arena_release_commit();
AsyncAppend(std::move(entry_batch));
return Status::OK();
}
Status Log::WriteBatch(LogEntryBatch* entry_batch) {
// If there is no data to write return OK.
if (PREDICT_FALSE(entry_batch->type_ == FLUSH_MARKER)) {
return Status::OK();
}
size_t num_entries = entry_batch->count();
DCHECK_GT(num_entries, 0) << "Cannot call WriteBatch() with zero entries reserved";
MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_append_fraction,
Status::IOError("Injected IOError in Log::WriteBatch()"));
Slice entry_batch_data = entry_batch->data();
uint32_t entry_batch_bytes = entry_batch->total_size_bytes();
scoped_refptr<ReadableLogSegment> finished_segment;
scoped_refptr<ReadableLogSegment> new_readable_segment;
RETURN_NOT_OK(segment_allocator_.AllocateOrRollOverIfNecessary(
entry_batch_bytes, &finished_segment, &new_readable_segment));
if (finished_segment) {
// Must be done before a new segment is appended.
reader_->ReplaceLastSegment(std::move(finished_segment));
}
if (new_readable_segment) {
reader_->AppendEmptySegment(std::move(new_readable_segment));
}
auto* active_segment = segment_allocator_.active_segment_.get();
int64_t start_offset = active_segment->written_offset();
LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Append to log took a long time", LogPrefix())) {
SCOPED_LATENCY_METRIC(ctx_.metrics, append_latency);
SCOPED_WATCH_STACK(500);
RETURN_NOT_OK(active_segment->WriteEntryBatch(entry_batch_data, segment_allocator_.codec_));
// Update the reader on how far it can read the active segment.
reader_->UpdateLastSegmentOffset(active_segment->written_offset());
if (segment_allocator_.hooks_) {
RETURN_NOT_OK(segment_allocator_.hooks_->PostAppend());
}
}
if (ctx_.metrics) {
ctx_.metrics->bytes_logged->IncrementBy(entry_batch_bytes);
}
CHECK_OK(UpdateIndexForBatch(*entry_batch, start_offset));
segment_allocator_.UpdateFooterForBatch(*entry_batch);
return Status::OK();
}
Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
int64_t start_offset) {
if (batch.type_ != REPLICATE) {
return Status::OK();
}
for (const OpId& op_id : batch.replicate_op_ids_) {
LogIndexEntry index_entry;
index_entry.op_id = op_id;
index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number();
index_entry.offset_in_segment = start_offset;
RETURN_NOT_OK(log_index_->AddEntry(index_entry));
}
return Status::OK();
}
Status Log::AllocateSegmentAndRollOverForTests() {
std::lock_guard<rw_spinlock> l(segment_idle_lock_);
scoped_refptr<ReadableLogSegment> finished_segment;
scoped_refptr<ReadableLogSegment> new_readable_segment;
RETURN_NOT_OK(segment_allocator_.AllocateSegmentAndRollOver(
&finished_segment, &new_readable_segment));
if (finished_segment) {
reader_->ReplaceLastSegment(std::move(finished_segment));
}
reader_->AppendEmptySegment(std::move(new_readable_segment));
return Status::OK();
}
Status Log::Sync() {
return segment_allocator_.Sync();
}
int GetPrefixSizeToGC(RetentionIndexes retention_indexes, const SegmentSequence& segments) {
int rem_segs = segments.size();
int prefix_size = 0;
for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
if (rem_segs <= FLAGS_log_min_segments_to_retain) {
break;
}
if (!segment->HasFooter()) break;
int64_t seg_max_idx = segment->footer().max_replicate_index();
// If removing this segment would compromise durability, we cannot remove it.
if (seg_max_idx >= retention_indexes.for_durability) {
break;
}
// Check if removing this segment would compromise the ability to catch up a peer,
// we should retain it, unless this would break the max_segments flag.
if (seg_max_idx >= retention_indexes.for_peers &&
rem_segs <= FLAGS_log_max_segments_to_retain) {
break;
}
prefix_size++;
rem_segs--;
}
return prefix_size;
}
void Log::GetSegmentsToGCUnlocked(RetentionIndexes retention_indexes,
SegmentSequence* segments_to_gc) const {
reader_->GetSegmentsSnapshot(segments_to_gc);
segments_to_gc->resize(GetPrefixSizeToGC(retention_indexes, *segments_to_gc));
}
Status Log::Append(LogEntryPB* entry) {
LogEntryBatchPB entry_batch_pb;
entry_batch_pb.mutable_entry()->UnsafeArenaAddAllocated(entry);
LogEntryBatch entry_batch(entry->type(), entry_batch_pb, &DoNothingStatusCB);
entry_batch_pb.mutable_entry()->ExtractSubrange(0, 1, nullptr);
Status s = WriteBatch(&entry_batch);
if (s.ok()) {
s = Sync();
}
return s;
}
Status Log::WaitUntilAllFlushed() {
// In order to make sure we empty the queue we need to use
// the async api.
LogEntryBatchPB entry_batch;
entry_batch.add_entry()->set_type(log::FLUSH_MARKER);
Synchronizer s;
unique_ptr<LogEntryBatch> reserved_entry_batch =
CreateBatchFromPB(FLUSH_MARKER, entry_batch, s.AsStatusCallback());
AsyncAppend(std::move(reserved_entry_batch));
return s.Wait();
}
Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
CHECK_GE(retention_indexes.for_durability, 0);
VLOG_WITH_PREFIX(1) << "Running Log GC on " << ctx_.log_dir << ": retaining "
"ops >= " << retention_indexes.for_durability << " for durability, "
"ops >= " << retention_indexes.for_peers << " for peers";
VLOG_TIMING(1, Substitute("$0Log GC", LogPrefix())) {
SegmentSequence segments_to_delete;
{
std::lock_guard<percpu_rwlock> l(state_lock_);
CHECK_EQ(kLogWriting, log_state_);
GetSegmentsToGCUnlocked(retention_indexes, &segments_to_delete);
if (segments_to_delete.empty()) {
VLOG_WITH_PREFIX(1) << "No segments to delete.";
*num_gced = 0;
return Status::OK();
}
// Trim the prefix of segments from the reader so that they are no longer
// referenced by the log.
reader_->TrimSegmentsUpToAndIncluding(
segments_to_delete[segments_to_delete.size() - 1]->header().sequence_number());
}
// Now that they are no longer referenced by the Log, delete the files.
*num_gced = 0;
for (const auto& segment : segments_to_delete) {
string ops_str;
if (segment->HasFooter() && segment->footer().has_min_replicate_index()) {
DCHECK(segment->footer().has_max_replicate_index());
ops_str = Substitute(" (ops $0-$1)",
segment->footer().min_replicate_index(),
segment->footer().max_replicate_index());
}
LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path() << ops_str;
if (PREDICT_TRUE(ctx_.file_cache)) {
// Note: the segment files will only be deleted from disk when
// segments_to_delete goes out of scope.
RETURN_NOT_OK(ctx_.file_cache->DeleteFile(segment->path()));
} else {
RETURN_NOT_OK(ctx_.fs_manager->env()->DeleteFile(segment->path()));
}
(*num_gced)++;
}
// Determine the minimum remaining replicate index in order to properly GC
// the index chunks.
int64_t min_remaining_op_idx = reader_->GetMinReplicateIndex();
if (min_remaining_op_idx > 0) {
log_index_->GC(min_remaining_op_idx);
}
}
return Status::OK();
}
int64_t Log::GetGCableDataSize(RetentionIndexes retention_indexes) const {
CHECK_GE(retention_indexes.for_durability, 0);
SegmentSequence segments_to_delete;
{
shared_lock<rw_spinlock> l(state_lock_.get_lock());
CHECK_EQ(kLogWriting, log_state_);
GetSegmentsToGCUnlocked(retention_indexes, &segments_to_delete);
}
int64_t total_size = 0;
for (const auto& segment : segments_to_delete) {
total_size += segment->file_size();
}
return total_size;
}
void Log::GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size) const {
replay_size->clear();
SegmentSequence segments;
{
shared_lock<rw_spinlock> l(state_lock_.get_lock());
CHECK_EQ(kLogWriting, log_state_);
reader_->GetSegmentsSnapshot(&segments);
}
int64_t cumulative_size = 0;
for (const auto& segment : boost::adaptors::reverse(segments)) {
if (!segment->HasFooter()) continue;
cumulative_size += segment->file_size();
int64_t max_repl_idx = segment->footer().max_replicate_index();
(*replay_size)[max_repl_idx] = cumulative_size;
}
}
int64_t Log::OnDiskSize() {
SegmentSequence segments;
{
shared_lock<rw_spinlock> l(state_lock_.get_lock());
// If the log is closed, the tablet is either being deleted or tombstoned,
// so we don't count the size of its log anymore as it should be deleted.
if (log_state_ == kLogClosed) {
return on_disk_size_.load();
}
reader_->GetSegmentsSnapshot(&segments);
}
int64_t ret = 0;
for (const auto& segment : segments) {
ret += segment->file_size();
}
on_disk_size_.store(ret, std::memory_order_relaxed);
return ret;
}
void Log::SetSchemaForNextLogSegment(Schema schema,
uint32_t version) {
segment_allocator_.SetSchemaForNextSegment(std::move(schema), version);
}
Status Log::Close() {
segment_allocator_.StopAllocationThread();
append_thread_->Shutdown();
{
std::lock_guard<percpu_rwlock> l(state_lock_);
switch (log_state_) {
case kLogWriting:
log_state_ = kLogClosed;
break;
case kLogClosed:
VLOG_WITH_PREFIX(1) << "Log already closed";
return Status::OK();
default:
return Status::IllegalState(Substitute(
"Log not open. State: $0", log_state_));
}
}
RETURN_NOT_OK(segment_allocator_.FinishCurrentSegment(
/*finished_segment=*/ nullptr));
VLOG_WITH_PREFIX(1) << "Log closed";
// Release FDs held by these objects.
segment_allocator_.active_segment_.reset();
log_index_.reset();
reader_.reset();
return Status::OK();
}
bool Log::HasOnDiskData(FsManager* fs_manager, const string& tablet_id) {
const string wal_dir = fs_manager->GetTabletWalDir(tablet_id);
return fs_manager->env()->FileExists(wal_dir);
}
Status Log::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) {
string wal_dir = fs_manager->GetTabletWalDir(tablet_id);
Env* env = fs_manager->env();
if (!env->FileExists(wal_dir)) {
return Status::OK();
}
LOG(INFO) << Substitute("T $0 P $1: Deleting WAL directory at $2",
tablet_id, fs_manager->uuid(), wal_dir);
// We don't need to delete through the file cache; we're guaranteed that
// the log has been closed (though this invariant isn't verifiable here
// without additional plumbing).
RETURN_NOT_OK_PREPEND(env->DeleteRecursively(wal_dir),
"Unable to recursively delete WAL dir for tablet " + tablet_id);
return Status::OK();
}
Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& tablet_id) {
string recovery_path = fs_manager->GetTabletWalRecoveryDir(tablet_id);
const auto kLogPrefix = Substitute("T $0 P $1: ", tablet_id, fs_manager->uuid());
if (!fs_manager->Exists(recovery_path)) {
VLOG(1) << kLogPrefix << "Tablet WAL recovery dir " << recovery_path <<
" does not exist.";
return Status::OK();
}
VLOG(1) << kLogPrefix << "Preparing to delete log recovery files and directory " << recovery_path;
string tmp_path = Substitute("$0-$1", recovery_path, GetCurrentTimeMicros());
VLOG(1) << kLogPrefix << "Renaming log recovery dir from " << recovery_path
<< " to " << tmp_path;
RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(recovery_path, tmp_path),
Substitute("Could not rename old recovery dir from: $0 to: $1",
recovery_path, tmp_path));
if (FLAGS_skip_remove_old_recovery_dir) {
LOG(INFO) << kLogPrefix << "--skip_remove_old_recovery_dir enabled. NOT deleting " << tmp_path;
return Status::OK();
}
VLOG(1) << kLogPrefix << "Deleting all files from renamed log recovery directory " << tmp_path;
// We don't need to delete through the file cache; we're guaranteed that
// the log has been closed (though this invariant isn't verifiable here
// without additional plumbing).
RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteRecursively(tmp_path),
"Could not remove renamed recovery dir " + tmp_path);
VLOG(1) << kLogPrefix << "Completed deletion of old log recovery files and directory "
<< tmp_path;
return Status::OK();
}
std::string Log::LogPrefix() const { return ctx_.LogPrefix(); }
Log::~Log() {
WARN_NOT_OK(Close(), "Error closing log");
}
LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
const LogEntryBatchPB& entry_batch_pb,
StatusCallback cb)
: type_(type),
total_size_bytes_(entry_batch_pb.ByteSizeLong()),
count_(entry_batch_pb.entry().size()),
callback_(std::move(cb)) {
if (total_size_bytes_) {
buffer_.reserve(total_size_bytes_);
pb_util::AppendToString(entry_batch_pb, &buffer_);
}
if (type == REPLICATE) {
replicate_op_ids_.reserve(entry_batch_pb.entry().size());
for (const auto& e : entry_batch_pb.entry()) {
DCHECK(e.has_replicate());
replicate_op_ids_.emplace_back(e.replicate().id());
}
}
}
LogEntryBatch::~LogEntryBatch() {}
} // namespace log
} // namespace kudu