blob: 0eaec788bc026f79bc7d1e920c1d973110a4565b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest_prod.h>
#include "kudu/common/schema.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_metrics.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/blocking_queue.h"
#include "kudu/util/faststring.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/promise.h"
#include "kudu/util/rw_mutex.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
#include "kudu/util/threadpool.h"
namespace kudu {
class CompressionCodec;
class FileCache;
class FsManager;
class RWFile;
namespace consensus {
class CommitMsg;
} // namespace consensus
namespace log {
class LogEntryBatch;
class LogFaultHooks;
class LogIndex;
class LogReader;
struct LogEntryBatchLogicalSize;
struct RetentionIndexes;
// Context used by the various classes that operate on the Log.
struct LogContext {
const std::string tablet_id;
const std::string log_dir;
scoped_refptr<MetricEntity> metric_entity;
std::unique_ptr<LogMetrics> metrics;
FsManager* fs_manager;
FileCache* file_cache;
std::string LogPrefix() const;
};
typedef BlockingQueue<std::unique_ptr<LogEntryBatch>, LogEntryBatchLogicalSize>
LogEntryBatchQueue;
// State of segment allocation.
enum SegmentAllocationState {
kAllocationNotStarted, // No segment allocation requested
kAllocationInProgress, // Next segment allocation started
kAllocationFinished // Next segment ready
};
// Encapsulates the logic around allocating log segments.
//
// Methods of this class are not threadsafe, unless otherwise mentioned.
// It is expected that segment allocation is driven through a single thread
// (presumably the append thread, as operations are written).
class SegmentAllocator {
public:
// Creates a new SegmentAllocator. The allocator isn't usable until Init() is
// called.
//
// 'opts' and 'ctx' are options and various context variables that
// are relevant for the Log for which we are allocating segments.
//
// 'schema' and 'schema_version' define the initial schema for the Log.
SegmentAllocator(const LogOptions* opts,
const LogContext* ctx,
Schema schema,
uint32_t schema_version);
// Initializes the SegmentAllocator using 'sequence_number' as the active
// segment's sequence number.
//
// 'new_readable_segment' contains the newly active segment, reopened for reading.
Status Init(uint64_t sequence_number,
scoped_refptr<ReadableLogSegment>* new_readable_segment);
// Checks whether it's time to allocate (e.g. the current segment is full)
// and/or roll over (e.g. a previous pre-allocation has finished), and does
// so as appropriate.
//
// 'write_size_bytes' is the expected size of the next write; if the active
// segment would go above the max segment size, a new segment is allocated.
//
// In the event of a roll over, 'finished_segment' contains a new segment
// reader for the just-finished segment (if there was one), and
// 'new_readable_segment' contains the newly active segment, reopened for reading.
Status AllocateOrRollOverIfNecessary(
uint32_t write_size_bytes,
scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment);
// Fsyncs the currently active segment to disk.
Status Sync();
// Syncs the current segment and writes out the footer.
//
// If 'finished_segment' is not null, it will contain a new ReadableLogSegment
// corresponding to the segment that was just finished.
Status FinishCurrentSegment(scoped_refptr<ReadableLogSegment>* finished_segment);
// Update the footer based on the written 'batch', e.g. to track the
// last-written OpId.
void UpdateFooterForBatch(const LogEntryBatch& batch);
// Shuts down the allocator threadpool. Note that this _doesn't_ close the
// current active segment.
void StopAllocationThread();
std::string LogPrefix() const { return ctx_->LogPrefix(); }
uint64_t active_segment_sequence_number() const {
return active_segment_sequence_number_;
}
private:
friend class Log;
friend class LogTest;
FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);
FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
SegmentAllocationState allocation_state() {
shared_lock<RWMutex> l(allocation_lock_);
return allocation_state_;
}
// This is not thread-safe. It is up to the caller to ensure this does not
// interfere with the append thread's attempts to switch log segments.
//
// If there was a previous active segment, 'finished_segment' contains a new
// segment reader built for that segment.
//
// 'new_readable_segment' contains the newly active segment, reopened for reading.
Status AllocateSegmentAndRollOver(
scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment);
// Sets the schema and version to be used for the next allocated segment.
void SetSchemaForNextSegment(Schema schema, uint32_t version);
// Schedules a task to allocate a new log segment.
// Must be called when the allocation_lock_ is held.
Status AsyncAllocateSegmentUnlocked();
// Task to be put onto the allocation_pool_ that allocates segments.
void AllocationTask();
// Creates a temporary file, populating 'next_segment_file_' and
// 'next_segment_path_', and pre-allocating 'max_segment_size_' bytes if
// pre-allocation is enabled.
Status AllocateNewSegment();
// Swaps in the next segment file as the new active segment.
//
// 'new_readable_segment' contains the newly active segment, reopened for reading.
Status SwitchToAllocatedSegment(
scoped_refptr<ReadableLogSegment>* new_readable_segment);
// Waits for any on-going allocation to complete and rolls over onto the
// allocated segment, swapping out the previous active segment if it existed.
//
// If there was a previous active segment, 'finished_segment' contains a new
// segment reader built for that segment.
//
// 'new_readable_segment' contains the newly active segment, reopened for reading.
Status RollOver(scoped_refptr<ReadableLogSegment>* finished_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment);
// Hooks used to inject faults into the allocator.
std::shared_ptr<LogFaultHooks> hooks_;
// Descriptors for the segment file that should be used as the next active
// segment.
std::shared_ptr<RWFile> next_segment_file_;
std::string next_segment_path_;
// Contains state shared by various Log-related classs.
const LogOptions* opts_;
const LogContext* ctx_;
// The maximum segment size, in bytes.
uint64_t max_segment_size_;
// The codec used to compress entries, or nullptr if not configured.
const CompressionCodec* codec_ = nullptr;
// The schema and schema version to be used for the next segment.
mutable rw_spinlock schema_lock_;
Schema schema_;
uint32_t schema_version_;
// Whether fsyncing has been disabled.
// This is used to disable fsync during bootstrap.
bool sync_disabled_;
// A footer being prepared for the current segment.
// When the segment is finished, it will be written.
LogSegmentFooterPB footer_;
// The currently active segment being written.
std::unique_ptr<WritableLogSegment> active_segment_;
// Protects allocation_state_;
mutable RWMutex allocation_lock_;
SegmentAllocationState allocation_state_ = kAllocationNotStarted;
// Single-threaded threadpool on which to allocate segments.
std::unique_ptr<ThreadPool> allocation_pool_;
Promise<Status> allocation_status_;
// The sequence number of the 'active' log segment.
uint64_t active_segment_sequence_number_ = 0;
};
// Log interface, inspired by Raft's (logcabin) Log. Provides durability to
// Kudu as a normal Write Ahead Log and also plays the role of persistent
// storage for the consensus state machine.
//
// Log uses group commit to improve write throughput and latency without
// compromising ordering and durability guarantees. A single background thread
// (in AppendThread) per Log instance is responsible for accumulating pending
// writes and writing them to disk.
//
// A separate background thread (in SegmentAllocator) per Log instance is
// responsible for synchronously allocating or asynchronously pre-allocating
// segment files as written entries fill up segments.
//
// The public interface of this class is thread-safe unless otherwise noted.
//
// Note: The Log needs to be Close()d before any log-writing class is
// destroyed, otherwise the Log might hold references to these classes
// to execute the callbacks after each write.
class Log : public RefCountedThreadSafe<Log> {
public:
static const Status kLogShutdownStatus;
static const uint64_t kInitialLogSegmentSequenceNumber;
// Opens or continues a log and sets 'log' to the newly built Log.
// After a successful Open() the Log is ready to receive entries.
static Status Open(LogOptions options,
FsManager* fs_manager,
FileCache* file_cache,
const std::string& tablet_id,
Schema schema,
uint32_t schema_version,
const scoped_refptr<MetricEntity>& metric_entity,
scoped_refptr<Log> *log);
~Log();
// Synchronously append a new entry to the log.
// Log does not take ownership of the passed 'entry'.
// This is not thread-safe.
Status Append(LogEntryPB* entry);
// Append the given set of replicate messages, asynchronously.
// This requires that the replicates have already been assigned OpIds.
Status AsyncAppendReplicates(std::vector<consensus::ReplicateRefPtr> replicates,
StatusCallback callback);
// Append the given commit message, asynchronously.
//
// Returns a bad status if the log is already shut down.
Status AsyncAppendCommit(const consensus::CommitMsg& commit_msg,
StatusCallback callback);
// Blocks the current thread until all the entries in the log queue
// are flushed and fsynced (if fsync of log entries is enabled).
Status WaitUntilAllFlushed();
// Syncs all state and closes the log.
Status Close();
// Return true if there is any on-disk data for the given tablet.
static bool HasOnDiskData(FsManager* fs_manager, const std::string& tablet_id);
// Delete all WAL data from the log associated with this tablet.
// REQUIRES: The Log must be closed.
static Status DeleteOnDiskData(FsManager* fs_manager, const std::string& tablet_id);
// Removes the recovery directory and all files contained therein, if it exists.
// Intended to be invoked after log replay successfully completes.
static Status RemoveRecoveryDirIfExists(FsManager* fs_manager, const std::string& tablet_id);
// Returns a reader that is able to read through the previous segments,
// provided the log is initialized and not yet closed. After being closed,
// this function will return NULL, but existing reader references will
// remain live.
std::shared_ptr<LogReader> reader() const { return reader_; }
void SetMaxSegmentSizeForTests(uint64_t max_segment_size) {
segment_allocator_.max_segment_size_ = max_segment_size;
}
// This is not thread-safe.
void DisableSync() {
segment_allocator_.sync_disabled_ = true;
}
// If we previous called DisableSync(), we should restore the
// default behavior and then call Sync() which will perform the
// actual syncing if required.
// This is not thread-safe.
Status ReEnableSyncIfRequired() {
segment_allocator_.sync_disabled_ = false;
return segment_allocator_.Sync();
}
// Get ID of tablet.
const std::string& tablet_id() const {
return ctx_.tablet_id;
}
// Runs the garbage collector on the set of previous segments. Segments that
// only refer to in-mem state that has been flushed are candidates for
// garbage collection.
//
// 'min_op_idx' is the minimum operation index required to be retained.
// If successful, num_gced is set to the number of deleted log segments.
//
// This method is thread-safe.
Status GC(RetentionIndexes retention_indexes, int* num_gced);
// Computes the amount of bytes that would have been GC'd if Log::GC had been called.
int64_t GetGCableDataSize(RetentionIndexes retention_indexes) const;
// Returns a map which can be used to determine the cumulative size of log segments
// containing entries at or above any given log index.
//
// For example, if the current log segments are:
//
// Indexes Size
// ------------------
// [1-100] 20MB
// [101-200] 15MB
// [201-300] 10MB
// [302-???] <open> (counts as 0MB)
//
// This function will return:
//
// {100 => 45MB,
// 200 => 25MB,
// 300 => 10MB}
//
// In other words, an anchor on any index <= 100 would retain 45MB of logs,
// and any anchor on 100 < index <= 200 would retain 25MB of logs, etc.
//
// Note that the returned values are in units of bytes, not MB.
void GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size) const;
// Returns the total size of the current segments, in bytes.
// Returns 0 if the log is shut down.
int64_t OnDiskSize();
// Returns the file system location of the currently active WAL segment.
const std::string& ActiveSegmentPathForTests() const {
return segment_allocator_.active_segment_->path();
}
// Return true if the append thread is currently active.
bool append_thread_active_for_tests() const;
// Forces the Log to allocate a new segment and roll over.
// This can be used to make sure all entries appended up to this point are
// available in closed, readable segments.
//
// This is not thread-safe. Used in test only.
Status AllocateSegmentAndRollOverForTests();
void SetLogFaultHooksForTests(const std::shared_ptr<LogFaultHooks>& hooks) {
segment_allocator_.hooks_ = hooks;
}
// Set the schema for the _next_ log segment.
//
// This method is thread-safe.
void SetSchemaForNextLogSegment(Schema schema, uint32_t version);
private:
friend class LogTest;
friend class LogTestBase;
friend class SegmentAllocator;
FRIEND_TEST(LogTestOptionalCompression, TestMultipleEntriesInABatch);
FRIEND_TEST(LogTestOptionalCompression, TestReadLogWithReplacedReplicates);
FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);
class AppendThread;
// Log state.
enum LogState {
kLogInitialized,
kLogWriting,
kLogClosed
};
Log(LogOptions options, LogContext ctx, Schema schema, uint32_t schema_version);
// Initializes a new one or continues an existing log.
Status Init();
// Sets that the current active segment is idle.
void SetActiveSegmentIdle();
// Creates a new LogEntryBatch from 'entry_batch_pb'; all entries must be of
// type 'type'.
//
// After the batch is appended to the log, 'cb' will be invoked with the
// result status of the append.
static std::unique_ptr<LogEntryBatch> CreateBatchFromPB(
LogEntryTypePB type, const LogEntryBatchPB& entry_batch_pb,
StatusCallback cb);
// Asynchronously appends 'entry_batch' to the log.
Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch);
// Writes serialized contents of 'entry' to the log. This is not thread-safe.
Status WriteBatch(LogEntryBatch* entry_batch);
// Update footer_builder_ to reflect the log indexes seen in 'batch'.
void UpdateFooterForBatch(LogEntryBatch* batch);
// Update the LogIndex to include entries for the replicate messages found in
// 'batch'. The index entry points to the offset 'start_offset' in the current
// log segment.
Status UpdateIndexForBatch(const LogEntryBatch& batch,
int64_t start_offset);
Status Sync();
// Helper method to get the segment sequence to GC based on the provided 'retention' struct.
void GetSegmentsToGCUnlocked(RetentionIndexes retention_indexes,
SegmentSequence* segments_to_gc) const;
LogEntryBatchQueue* entry_queue() {
return &entry_batch_queue_;
}
std::string LogPrefix() const;
LogOptions options_;
LogContext ctx_;
std::string log_dir_;
// Lock to protect mutations to log_state_ and other shared state variables.
// Generally this is used to ensure adding and removing segments from the log
// reader is threadsafe.
mutable percpu_rwlock state_lock_;
LogState log_state_;
// A reader for the previous segments that were not yet GC'd.
//
// Will be NULL after the log is Closed().
std::shared_ptr<LogReader> reader_;
// Index which translates between operation indexes and the position
// of the operation in the log.
scoped_refptr<LogIndex> log_index_;
// The maximum segment size, in bytes.
uint64_t max_segment_size_;
// The queue used to communicate between the threads appending operations to
// the log and the thread which actually writing the operations them to disk.
LogEntryBatchQueue entry_batch_queue_;
// Thread writing to the log
std::unique_ptr<AppendThread> append_thread_;
SegmentAllocator segment_allocator_;
// Protects the active segment as it is going idle, in case other threads
// attempt to switch segments concurrently. This shouldn't happen in
// production, but may happen if AllocateSegmentAndRollOver() is called.
mutable rw_spinlock segment_idle_lock_;
// The cached on-disk size of the log, used to track its size even if it has been closed.
std::atomic<int64_t> on_disk_size_;
DISALLOW_COPY_AND_ASSIGN(Log);
};
// Indicates which log indexes should be retained for different purposes.
//
// When default-constructed, starts with maximum indexes, indicating no
// logs need to be retained for either purposes.
struct RetentionIndexes {
explicit RetentionIndexes(int64_t durability = std::numeric_limits<int64_t>::max(),
int64_t peers = std::numeric_limits<int64_t>::max())
: for_durability(durability),
for_peers(peers) {}
// The minimum log entry index which *must* be retained in order to
// preserve durability and the ability to restart the local node
// from its WAL.
int64_t for_durability;
// The minimum log entry index which *should* be retained in order to
// catch up other peers hosting this same tablet. These entries may
// still be GCed in the case that they are from very old log segments
// or the log has become too large.
int64_t for_peers;
};
// This class represents a batch of operations to be written and
// synced to the log. It is opaque to the user and is managed by the
// Log class.
//
// A single batch must have only one type of entries in it (eg only
// REPLICATEs or only COMMITs).
//
// The ReplicateMsg sub-elements of each LogEntryPB within the LogEntryBatchPB
// 'entry_batch_pb_' are not owned by the LogEntryPBs, and at LogEntryBatch
// destruction time they are released.
class LogEntryBatch {
public:
~LogEntryBatch();
private:
friend class Log;
friend struct LogEntryBatchLogicalSize;
friend class MultiThreadedLogTest;
friend class SegmentAllocator;
LogEntryBatch(LogEntryTypePB type, const LogEntryBatchPB& entry_batch_pb,
StatusCallback cb);
// Serializes contents of the entry to an internal buffer.
void Serialize();
// Returns a Slice representing the serialized contents of the
// entry.
Slice data() const {
return Slice(buffer_);
}
size_t count() const { return count_; }
// Returns the total size in bytes of the object.
size_t total_size_bytes() const {
return total_size_bytes_;
}
void SetAppendError(const Status& s) {
DCHECK(!s.ok());
if (append_status_.ok()) {
append_status_ = s;
}
}
void RunCallback() {
callback_(append_status_);
}
// The type of entries in this batch.
const LogEntryTypePB type_;
// Total size in bytes of all entries
const size_t total_size_bytes_;
// Number of entries in the serialized batch.
const size_t count_;
// Used only when type is REPLICATE, the opids of the serialized
// ReplicateMsgs.
std::vector<consensus::OpId> replicate_op_ids_;
// Callback to be invoked upon the entries being written and
// synced to disk.
StatusCallback callback_;
// Buffer to which 'phys_entries_' are serialized by call to
// 'Serialize()'
faststring buffer_;
// Tracks whether this batch was successfully append to the log.
Status append_status_;
DISALLOW_COPY_AND_ASSIGN(LogEntryBatch);
};
// Used by 'Log::queue_' to determine logical size of a LogEntryBatch.
struct LogEntryBatchLogicalSize {
static size_t logical_size(const std::unique_ptr<LogEntryBatch>& batch) {
return batch->total_size_bytes();
}
};
class LogFaultHooks {
public:
// Executed immediately before returning from Log::Sync() at *ALL*
// times.
virtual Status PostSync() { return Status::OK(); }
// Iff fsync is enabled, executed immediately after call to fsync.
virtual Status PostSyncIfFsyncEnabled() { return Status::OK(); }
// Emulate a slow disk where the filesystem has decided to synchronously
// flush a full buffer.
virtual Status PostAppend() { return Status::OK(); }
virtual Status PreClose() { return Status::OK(); }
virtual Status PostClose() { return Status::OK(); }
virtual ~LogFaultHooks() {}
};
} // namespace log
} // namespace kudu