blob: 9f6008580eaaaad7e3bdfe482a68349b840125a1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_bootstrap.h"
#include <gflags/gflags.h>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/gutil/walltime.h"
#include "kudu/server/clock.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/tablet/lock_manager.h"
#include "kudu/tablet/row_op.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tablet/transactions/alter_schema_transaction.h"
#include "kudu/tablet/transactions/write_transaction.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/path_util.h"
#include "kudu/util/stopwatch.h"
DEFINE_bool(skip_remove_old_recovery_dir, false,
"Skip removing WAL recovery dir after startup. (useful for debugging)");
TAG_FLAG(skip_remove_old_recovery_dir, hidden);
DEFINE_double(fault_crash_during_log_replay, 0.0,
"Fraction of the time when the tablet will crash immediately "
"after processing a log entry during log replay. "
"(For testing only!)");
TAG_FLAG(fault_crash_during_log_replay, unsafe);
namespace kudu {
namespace tablet {
using boost::shared_lock;
using consensus::ALTER_SCHEMA_OP;
using consensus::CHANGE_CONFIG_OP;
using consensus::ChangeConfigRecordPB;
using consensus::CommitMsg;
using consensus::ConsensusBootstrapInfo;
using consensus::ConsensusMetadata;
using consensus::ConsensusRound;
using consensus::MinimumOpId;
using consensus::NO_OP;
using consensus::OperationType;
using consensus::OperationType_Name;
using consensus::OpId;
using consensus::OpIdEquals;
using consensus::OpIdEqualsFunctor;
using consensus::OpIdHashFunctor;
using consensus::OpIdToString;
using consensus::RaftConfigPB;
using consensus::ReplicateMsg;
using consensus::WRITE_OP;
using log::Log;
using log::LogAnchorRegistry;
using log::LogEntryPB;
using log::LogOptions;
using log::LogReader;
using log::ReadableLogSegment;
using server::Clock;
using std::map;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using strings::Substitute;
using tserver::AlterSchemaRequestPB;
using tserver::WriteRequestPB;
struct ReplayState;
// Information from the tablet metadata which indicates which data was
// flushed prior to this restart and which memory stores are still active.
// We take a snapshot of this information at the beginning of the bootstrap
// process so that we can allow compactions and flushes to run during bootstrap
// without confusing our tracking of flushed stores.
// NOTE: automatic flushes and compactions are not currently scheduled during
// bootstrap. However, flushes may still be triggered due to operations like
// alter-table.
class FlushedStoresSnapshot {
FlushedStoresSnapshot() {}
Status InitFrom(const TabletMetadata& meta);
// Return true if the given memory store is still active (i.e. edits that were
// originally written to this memory store should be replayed during the bootstrap
// process).
// NOTE: a store may be inactive for either of two reasons. Either:
// (a) the store was flushed to disk, OR
// (b) the store was in the process of being written by a flush or compaction
// but the process crashed before the associated tablet metadata update
// was committed.
bool IsMemStoreActive(const MemStoreTargetPB& target) const;
int64_t last_durable_mrs_id_;
unordered_map<int64_t, int64_t> flushed_dms_by_drs_id_;
// Bootstraps an existing tablet by opening the metadata from disk, and rebuilding soft
// state by playing log segments. A bootstrapped tablet can then be added to an existing
// consensus configuration as a LEARNER, which will bring its state up to date with the
// rest of the consensus configuration, or it can start serving the data itself, after it
// has been appointed LEADER of that particular consensus configuration.
// NOTE: this does not handle pulling data from other replicas in the cluster. That
// is handled by the 'RemoteBootstrap' classes, which copy blocks and metadata locally
// before invoking this local bootstrap functionality.
// TODO Because the table that is being rebuilt is never flushed/compacted, consensus
// is only set on the tablet after bootstrap, when we get to flushes/compactions though
// we need to set it before replay or we won't be able to re-rebuild.
class TabletBootstrap {
TabletBootstrap(const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock,
shared_ptr<MemTracker> mem_tracker,
MetricRegistry* metric_registry,
TabletStatusListener* listener,
const scoped_refptr<LogAnchorRegistry>& log_anchor_registry);
// Plays the log segments, rebuilding the portion of the Tablet's soft
// state that is present in the log (additional soft state may be present
// in other replicas).
// A successful call will yield the rebuilt tablet and the rebuilt log.
Status Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* results);
// Opens the tablet.
// Sets '*has_blocks' to true if there was any data on disk for this tablet.
Status OpenTablet(bool* has_blocks);
// Checks if a previous log recovery directory exists. If so, it deletes any
// files in the log dir and sets 'needs_recovery' to true, meaning that the
// previous recovery attempt should be retried from the recovery dir.
// Otherwise, if there is a log directory with log files in it, renames that
// log dir to the log recovery dir and creates a new, empty log dir so that
// log replay can proceed. 'needs_recovery' is also returned as true in this
// case.
// If no log segments are found, 'needs_recovery' is set to false.
Status PrepareRecoveryDir(bool* needs_recovery);
// Opens the latest log segments for the Tablet that will allow to rebuild
// the tablet's soft state. If there are existing log segments in the tablet's
// log directly they are moved to a "log-recovery" directory which is deleted
// when the replay process is completed (as they have been duplicated in the
// current log directory).
// If a "log-recovery" directory is already present, we will continue to replay
// from the "log-recovery" directory. Tablet metadata is updated once replay
// has finished from the "log-recovery" directory.
Status OpenLogReaderInRecoveryDir();
// Opens a new log in the tablet's log directory.
// The directory is expected to be clean.
Status OpenNewLog();
// Finishes bootstrap, setting 'rebuilt_log' and 'rebuilt_tablet'.
Status FinishBootstrap(const string& message,
scoped_refptr<log::Log>* rebuilt_log,
shared_ptr<Tablet>* rebuilt_tablet);
// Plays the log segments into the tablet being built.
// The process of playing the segments generates a new log that can be continued
// later on when then tablet is rebuilt and starts accepting writes from clients.
Status PlaySegments(ConsensusBootstrapInfo* results);
// Append the given commit message to the log.
// Does not support writing a TxResult.
Status AppendCommitMsg(const CommitMsg& commit_msg);
Status PlayWriteRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayAlterSchemaRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayChangeConfigRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayNoOpRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
// Plays operations, skipping those that have already been flushed,
// as indicated in the 'already_flushed' vector.
Status PlayRowOperations(WriteTransactionState* tx_state,
const SchemaPB& schema_pb,
const RowOperationsPB& ops_pb,
const TxResultPB& result,
const vector<bool>& already_flushed);
// Determine which of the operations from 'result' correspond to already-flushed
// stores.
Status DetermineFlushedOps(const TxResultPB& result,
vector<bool>* flushed_by_op);
// Pass through all of the decoded operations in tx_state. For
// each op:
// - if it was previously failed, mark as failed
// - if it previously succeeded but was flushed, skip it.
// - otherwise, re-apply to the tablet being bootstrapped.
Status ApplyOperations(WriteTransactionState* tx_state,
const TxResultPB& orig_result);
// Filter a row operation, setting '*already_flushed' to indicate if
// it was already flushed.
Status FilterOperation(const OperationResultPB& op_result,
bool* already_flushed);
enum ActiveStores {
// The OperationResultPBs in the commit message do not reference any stores.
// This can happen in the case that the operations did not result in any mutations
// (e.g. because they were updates for not-found row keys).
// At least one operation resulted in a mutation to a store, but none of the
// mutated stores are still active. Therefore the operation does not need to
// be replayed.
// At least one operation resulted in a mutation to a store, and at least
// one of those mutated stores is still active. This implies that the operation
// needs to be replayed.
// For the given commit message, analyze which memory stores were mutated
// by the operation, returning one of the enum values above.
ActiveStores AnalyzeActiveStores(const CommitMsg& commit);
void DumpReplayStateToLog(const ReplayState& state);
// Handlers for each type of message seen in the log during replay.
Status HandleEntry(ReplayState* state, LogEntryPB* entry);
Status HandleReplicateMessage(ReplayState* state, LogEntryPB* replicate_entry);
Status HandleCommitMessage(ReplayState* state, LogEntryPB* commit_entry);
Status ApplyCommitMessage(ReplayState* state, LogEntryPB* commit_entry);
Status HandleEntryPair(LogEntryPB* replicate_entry, LogEntryPB* commit_entry);
// Checks that an orphaned commit message is actually irrelevant, i.e that none
// of the data stores it refers to are live.
Status CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit);
// Decodes a Timestamp from the provided string and updates the clock
// with it.
Status UpdateClock(uint64_t timestamp);
// Removes the recovery directory and all files contained therein.
// Intended to be invoked after log replay successfully completes.
Status RemoveRecoveryDir();
// Return a log prefix string in the standard "T xxx P yyy" format.
string LogPrefix() const;
scoped_refptr<TabletMetadata> meta_;
scoped_refptr<Clock> clock_;
shared_ptr<MemTracker> mem_tracker_;
MetricRegistry* metric_registry_;
TabletStatusListener* listener_;
gscoped_ptr<tablet::Tablet> tablet_;
const scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
scoped_refptr<log::Log> log_;
std::shared_ptr<log::LogReader> log_reader_;
gscoped_ptr<ConsensusMetadata> cmeta_;
// Statistics on the replay of entries in the log.
struct Stats {
: ops_read(0),
orphaned_commits(0) {
string ToString() const {
return Substitute("ops{read=$0 overwritten=$1 applied=$2 ignored=$3} "
"inserts{seen=$4 ignored=$5} "
"mutations{seen=$6 ignored=$7} "
ops_read, ops_overwritten, ops_committed, ops_ignored,
inserts_seen, inserts_ignored,
mutations_seen, mutations_ignored,
// Number of REPLICATE messages read from the log
int ops_read;
// Number of REPLICATE messages which were overwritten by later entries.
int ops_overwritten;
// Number of REPLICATE messages which were able to be completely ignored
// because the COMMIT message indicated that all of the contained operations
// were already flushed.
int ops_ignored;
// Number of REPLICATE messages for which a matching COMMIT was found.
int ops_committed;
// Number inserts/mutations seen and ignored.
int inserts_seen, inserts_ignored;
int mutations_seen, mutations_ignored;
// Number of COMMIT messages for which a corresponding REPLICATE was not found.
int orphaned_commits;
Stats stats_;
// Snapshot of which stores were flushed prior to restart.
FlushedStoresSnapshot flushed_stores_;
TabletStatusListener::TabletStatusListener(const scoped_refptr<TabletMetadata>& meta)
: meta_(meta),
last_status_("") {
const string TabletStatusListener::tablet_id() const {
return meta_->tablet_id();
const string TabletStatusListener::table_name() const {
return meta_->table_name();
const Partition& TabletStatusListener::partition() const {
return meta_->partition();
const Schema& TabletStatusListener::schema() const {
return meta_->schema();
TabletStatusListener::~TabletStatusListener() {
void TabletStatusListener::StatusMessage(const string& status) {
LOG(INFO) << "T " << tablet_id() << " P " << meta_->fs_manager()->uuid() << ": "
<< status;
boost::lock_guard<boost::shared_mutex> l(lock_);
last_status_ = status;
Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock,
const shared_ptr<MemTracker>& mem_tracker,
MetricRegistry* metric_registry,
TabletStatusListener* listener,
shared_ptr<tablet::Tablet>* rebuilt_tablet,
scoped_refptr<log::Log>* rebuilt_log,
const scoped_refptr<log::LogAnchorRegistry>& log_anchor_registry,
ConsensusBootstrapInfo* consensus_info) {
TRACE_EVENT1("tablet", "BootstrapTablet",
"tablet_id", meta->tablet_id());
TabletBootstrap bootstrap(meta, clock, mem_tracker,
metric_registry, listener, log_anchor_registry);
RETURN_NOT_OK(bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, consensus_info));
// This is necessary since OpenNewLog() initially disables sync.
return Status::OK();
static string DebugInfo(const string& tablet_id,
int segment_seqno,
int entry_idx,
const string& segment_path,
const LogEntryPB& entry) {
// Truncate the debug string to a reasonable length for logging.
// Otherwise, glog will truncate for us and we may miss important
// information which came after this long string.
string debug_str = entry.ShortDebugString();
if (debug_str.size() > 500) {
return Substitute("Debug Info: Error playing entry $0 of segment $1 of tablet $2. "
"Segment path: $3. Entry: $4", entry_idx, segment_seqno, tablet_id,
segment_path, debug_str);
const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock, shared_ptr<MemTracker> mem_tracker,
MetricRegistry* metric_registry, TabletStatusListener* listener,
const scoped_refptr<LogAnchorRegistry>& log_anchor_registry)
: meta_(meta),
log_anchor_registry_(log_anchor_registry) {}
Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* consensus_info) {
string tablet_id = meta_->tablet_id();
// Replay requires a valid Consensus metadata file to exist in order to
// compare the committed consensus configuration seqno with the log entries and also to persist
// committed but unpersisted changes.
RETURN_NOT_OK_PREPEND(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id,
meta_->fs_manager()->uuid(), &cmeta_),
"Unable to load Consensus metadata");
// Make sure we don't try to locally bootstrap a tablet that was in the middle
// of a remote bootstrap. It's likely that not all files were copied over
// successfully.
TabletDataState tablet_data_state = meta_->tablet_data_state();
if (tablet_data_state != TABLET_DATA_READY) {
return Status::Corruption("Unable to locally bootstrap tablet " + tablet_id + ": " +
"TabletMetadata bootstrap state is " +
listener_->StatusMessage("Bootstrap starting.");
if (VLOG_IS_ON(1)) {
TabletSuperBlockPB super_block;
VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << super_block.DebugString();
bool has_blocks;
bool needs_recovery;
if (needs_recovery) {
// This is a new tablet, nothing left to do.
if (!has_blocks && !needs_recovery) {
LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log.";
RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log");
RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log",
consensus_info->last_id = MinimumOpId();
consensus_info->last_committed_id = MinimumOpId();
return Status::OK();
// If there were blocks, there must be segments to replay. This is required
// by Raft, since we always need to know the term and index of the last
// logged op in order to vote, know how to respond to AppendEntries(), etc.
if (has_blocks && !needs_recovery) {
return Status::IllegalState(Substitute("Tablet $0: Found rowsets but no log "
"segments could be found.",
// Before playing any segments we set the safe and clean times to 'kMin' so that
// the MvccManager will accept all transactions that we replay as uncommitted.
RETURN_NOT_OK_PREPEND(PlaySegments(consensus_info), "Failed log replay. Reason");
// Flush the consensus metadata once at the end to persist our changes, if any.
RETURN_NOT_OK(FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet));
return Status::OK();
Status TabletBootstrap::FinishBootstrap(const string& message,
scoped_refptr<log::Log>* rebuilt_log,
shared_ptr<Tablet>* rebuilt_tablet) {
// Add a callback to TabletMetadata that makes sure that each time we flush the metadata
// we also wait for in-flights to finish and for their wal entry to be fsynced.
// This might be a bit conservative in some situations but it will prevent us from
// ever flushing the metadata referring to tablet data blocks containing data whose
// commit entries are not durable, a pre-requisite for recovery.
make_scoped_refptr(new FlushInflightsToLogCallback(tablet_.get(),
return Status::OK();
Status TabletBootstrap::OpenTablet(bool* has_blocks) {
gscoped_ptr<Tablet> tablet(new Tablet(meta_,
// doing nothing for now except opening a tablet locally.
LOG_TIMING_PREFIX(INFO, LogPrefix(), "opening tablet") {
*has_blocks = tablet->num_rowsets() != 0;
return Status::OK();
Status TabletBootstrap::PrepareRecoveryDir(bool* needs_recovery) {
*needs_recovery = false;
FsManager* fs_manager = tablet_->metadata()->fs_manager();
string tablet_id = tablet_->metadata()->tablet_id();
string log_dir = fs_manager->GetTabletWalDir(tablet_id);
// If the recovery directory exists, then we crashed mid-recovery.
// Throw away any logs from the previous recovery attempt and restart the log
// replay process from the beginning using the same recovery dir as last time.
string recovery_path = fs_manager->GetTabletWalRecoveryDir(tablet_id);
if (fs_manager->Exists(recovery_path)) {
LOG_WITH_PREFIX(INFO) << "Previous recovery directory found at " << recovery_path << ": "
<< "Replaying log files from this location instead of " << log_dir;
// Since we have a recovery directory, clear out the log_dir by recursively
// deleting it and creating a new one so that we don't end up with remnants
// of old WAL segments or indexes after replay.
if (fs_manager->env()->FileExists(log_dir)) {
LOG_WITH_PREFIX(INFO) << "Deleting old log files from previous recovery attempt in "
<< log_dir;
"Could not recursively delete old log dir " + log_dir);
"Failed to create log directory " + log_dir);
*needs_recovery = true;
return Status::OK();
// If we made it here, there was no pre-existing recovery dir.
// Now we look for log files in log_dir, and if we find any then we rename
// the whole log_dir to a recovery dir and return needs_recovery = true.
"Failed to create log dir");
vector<string> children;
RETURN_NOT_OK_PREPEND(fs_manager->ListDir(log_dir, &children),
"Couldn't list log segments.");
for (const string& child : children) {
if (!log::IsLogFileName(child)) {
string source_path = JoinPathSegments(log_dir, child);
string dest_path = JoinPathSegments(recovery_path, child);
LOG_WITH_PREFIX(INFO) << "Will attempt to recover log segment " << source_path
<< " to " << dest_path;
*needs_recovery = true;
if (*needs_recovery) {
// Atomically rename the log directory to the recovery directory
// and then re-create the log directory.
LOG_WITH_PREFIX(INFO) << "Moving log directory " << log_dir << " to recovery directory "
<< recovery_path << " in preparation for log replay";
RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(log_dir, recovery_path),
Substitute("Could not move log directory $0 to recovery dir $1",
log_dir, recovery_path));
"Failed to recreate log directory " + log_dir);
return Status::OK();
Status TabletBootstrap::OpenLogReaderInRecoveryDir() {
VLOG_WITH_PREFIX(1) << "Opening log reader in log recovery dir "
<< meta_->fs_manager()->GetTabletWalRecoveryDir(tablet_->tablet_id());
// Open the reader.
"Could not open LogReader. Reason");
return Status::OK();
Status TabletBootstrap::RemoveRecoveryDir() {
FsManager* fs_manager = tablet_->metadata()->fs_manager();
string recovery_path = fs_manager->GetTabletWalRecoveryDir(tablet_->metadata()->tablet_id());
<< "Tablet WAL recovery dir " << recovery_path << " does not exist.";
LOG_WITH_PREFIX(INFO) << "Preparing to delete log recovery files and directory " << recovery_path;
string tmp_path = Substitute("$0-$1", recovery_path, GetCurrentTimeMicros());
LOG_WITH_PREFIX(INFO) << "Renaming log recovery dir from " << recovery_path
<< " to " << tmp_path;
RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(recovery_path, tmp_path),
Substitute("Could not rename old recovery dir from: $0 to: $1",
recovery_path, tmp_path));
if (FLAGS_skip_remove_old_recovery_dir) {
LOG_WITH_PREFIX(INFO) << "--skip_remove_old_recovery_dir enabled. NOT deleting " << tmp_path;
return Status::OK();
LOG_WITH_PREFIX(INFO) << "Deleting all files from renamed log recovery directory " << tmp_path;
"Could not remove renamed recovery dir " + tmp_path);
LOG_WITH_PREFIX(INFO) << "Completed deletion of old log recovery files and directory "
<< tmp_path;
return Status::OK();
Status TabletBootstrap::OpenNewLog() {
OpId init;
// Disable sync temporarily in order to speed up appends during the
// bootstrap process.
return Status::OK();
typedef map<int64_t, LogEntryPB*> OpIndexToEntryMap;
// State kept during replay.
struct ReplayState {
: prev_op_id(MinimumOpId()),
committed_op_id(MinimumOpId()) {
~ReplayState() {
// Return true if 'b' is allowed to immediately follow 'a' in the log.
static bool IsValidSequence(const OpId& a, const OpId& b) {
if (a.term() == 0 && a.index() == 0) {
// Not initialized - can start with any opid.
return true;
// Within the same term, we should never skip entries.
// We can, however go backwards (see KUDU-783 for an example)
if (b.term() == a.term() &&
b.index() > a.index() + 1) {
return false;
return true;
// Return a Corruption status if 'id' seems to be out-of-sequence in the log.
Status CheckSequentialReplicateId(const ReplicateMsg& msg) {
if (PREDICT_FALSE(!IsValidSequence(prev_op_id, {
string op_desc = Substitute("$0 REPLICATE (Type: $1)",
return Status::Corruption(
Substitute("Unexpected opid following opid $0. Operation: $1",
prev_op_id =;
return Status::OK();
void UpdateCommittedOpId(const OpId& id) {
if (id.index() > committed_op_id.index()) {
committed_op_id = id;
void AddEntriesToStrings(const OpIndexToEntryMap& entries, vector<string>* strings) const {
for (const OpIndexToEntryMap::value_type& map_entry : entries) {
LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second);
strings->push_back(Substitute(" $0", entry->ShortDebugString()));
void DumpReplayStateToStrings(vector<string>* strings) const {
strings->push_back(Substitute("ReplayState: Previous OpId: $0, Committed OpId: $1, "
"Pending Replicates: $2, Pending Commits: $3", OpIdToString(prev_op_id),
OpIdToString(committed_op_id), pending_replicates.size(), pending_commits.size()));
if (!pending_replicates.empty()) {
strings->push_back("Dumping REPLICATES: ");
AddEntriesToStrings(pending_replicates, strings);
if (!pending_commits.empty()) {
strings->push_back("Dumping COMMITS: ");
AddEntriesToStrings(pending_commits, strings);
// The last replicate message's ID.
OpId prev_op_id;
// The last operation known to be committed.
// All other operations with lower IDs are also committed.
OpId committed_op_id;
// REPLICATE log entries whose corresponding COMMIT record has
// not yet been seen. Keyed by index.
OpIndexToEntryMap pending_replicates;
// COMMIT log entries which couldn't be applied immediately.
OpIndexToEntryMap pending_commits;
// Handle the given log entry. If OK is returned, then takes ownership of 'entry'.
// Otherwise, caller frees.
Status TabletBootstrap::HandleEntry(ReplayState* state, LogEntryPB* entry) {
if (VLOG_IS_ON(1)) {
VLOG_WITH_PREFIX(1) << "Handling entry: " << entry->ShortDebugString();
switch (entry->type()) {
case log::REPLICATE:
RETURN_NOT_OK(HandleReplicateMessage(state, entry));
case log::COMMIT:
// check the unpaired ops for the matching replicate msg, abort if not found
RETURN_NOT_OK(HandleCommitMessage(state, entry));
return Status::Corruption(Substitute("Unexpected log entry type: $0", entry->type()));
return Status::OK();
// Takes ownership of 'replicate_entry' on OK status.
Status TabletBootstrap::HandleReplicateMessage(ReplayState* state, LogEntryPB* replicate_entry) {
const ReplicateMsg& replicate = replicate_entry->replicate();
// Append the replicate message to the log as is
int64_t index = replicate_entry->replicate().id().index();
LogEntryPB** existing_entry_ptr = InsertOrReturnExisting(
&state->pending_replicates, index, replicate_entry);
// If there was a entry with the same index we're overwriting then we need to delete
// that entry and all entries with higher indexes.
if (existing_entry_ptr) {
LogEntryPB* existing_entry = *existing_entry_ptr;
auto iter = state->pending_replicates.lower_bound(index);
DCHECK(OpIdEquals((*iter).second->replicate().id(), existing_entry->replicate().id()));
LogEntryPB* last_entry = (*state->pending_replicates.rbegin()).second;
LOG_WITH_PREFIX(INFO) << "Overwriting operations starting at: "
<< existing_entry->replicate().id()
<< " up to: " << last_entry->replicate().id()
<< " with operation: " << replicate_entry->replicate().id();
while (iter != state->pending_replicates.end()) {
delete (*iter).second;
InsertOrDie(&state->pending_replicates, index, replicate_entry);
return Status::OK();
// Takes ownership of 'commit_entry' on OK status.
Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB* commit_entry) {
DCHECK(commit_entry->has_commit()) << "Not a commit message: " << commit_entry->DebugString();
// Match up the COMMIT record with the original entry that it's applied to.
const OpId& committed_op_id = commit_entry->commit().commited_op_id();
// If there are no pending replicates, or if this commit's index is lower than the
// the first pending replicate on record this is likely an orphaned commit.
if (state->pending_replicates.empty() ||
(*state->pending_replicates.begin()).first > committed_op_id.index()) {
VLOG_WITH_PREFIX(2) << "Found orphaned commit for " << committed_op_id;
delete commit_entry;
return Status::OK();
// If this commit does not correspond to the first replicate message in the pending
// replicates set we keep it to apply later...
if ((*state->pending_replicates.begin()).first != committed_op_id.index()) {
if (!ContainsKey(state->pending_replicates, committed_op_id.index())) {
return Status::Corruption(Substitute("Could not find replicate for commit: $0",
VLOG_WITH_PREFIX(2) << "Adding pending commit for " << committed_op_id;
InsertOrDie(&state->pending_commits, committed_op_id.index(), commit_entry);
return Status::OK();
// ... if it does, we apply it and all the commits that immediately follow in the sequence.
OpId last_applied = commit_entry->commit().commited_op_id();
RETURN_NOT_OK(ApplyCommitMessage(state, commit_entry));
delete commit_entry;
auto iter = state->pending_commits.begin();
while (iter != state->pending_commits.end()) {
if ((*iter).first == last_applied.index() + 1) {
gscoped_ptr<LogEntryPB> buffered_commit_entry((*iter).second);
last_applied = buffered_commit_entry->commit().commited_op_id();
RETURN_NOT_OK(ApplyCommitMessage(state, buffered_commit_entry.get()));
return Status::OK();
TabletBootstrap::ActiveStores TabletBootstrap::AnalyzeActiveStores(const CommitMsg& commit) {
bool has_mutated_stores = false;
bool has_active_stores = false;
for (const OperationResultPB& op_result : commit.result().ops()) {
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
has_mutated_stores = true;
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
has_active_stores = true;
if (!has_mutated_stores) {
return has_active_stores ? SOME_STORES_ACTIVE : NO_STORES_ACTIVE;
Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit) {
if (AnalyzeActiveStores(commit) == SOME_STORES_ACTIVE) {
TabletSuperBlockPB super;
WARN_NOT_OK(meta_->ToSuperBlock(&super), LogPrefix() + "Couldn't build TabletSuperBlockPB");
return Status::Corruption(Substitute("CommitMsg was orphaned but it referred to "
"stores which need replay. Commit: $0. TabletMetadata: $1", commit.ShortDebugString(),
return Status::OK();
Status TabletBootstrap::ApplyCommitMessage(ReplayState* state, LogEntryPB* commit_entry) {
const OpId& committed_op_id = commit_entry->commit().commited_op_id();
VLOG_WITH_PREFIX(2) << "Applying commit for " << committed_op_id;
gscoped_ptr<LogEntryPB> pending_replicate_entry;
// They should also have an associated replicate index (it may have been in a
// deleted log segment though).
if (pending_replicate_entry != nullptr) {
// We found a replicate with the same index, make sure it also has the same
// term.
if (!OpIdEquals(committed_op_id, pending_replicate_entry->replicate().id())) {
string error_msg = Substitute("Committed operation's OpId: $0 didn't match the"
"commit message's committed OpId: $1. Pending operation: $2, Commit message: $3",
return Status::Corruption(error_msg);
RETURN_NOT_OK(HandleEntryPair(pending_replicate_entry.get(), commit_entry));
} else {
return Status::OK();
// Never deletes 'replicate_entry' or 'commit_entry'.
Status TabletBootstrap::HandleEntryPair(LogEntryPB* replicate_entry, LogEntryPB* commit_entry) {
const char* error_fmt = "Failed to play $0 request. ReplicateMsg: { $1 }, CommitMsg: { $2 }";
#define RETURN_NOT_OK_REPLAY(ReplayMethodName, replicate, commit) \
RETURN_NOT_OK_PREPEND(ReplayMethodName(replicate, commit), \
Substitute(error_fmt, OperationType_Name(op_type), \
replicate->ShortDebugString(), commit.ShortDebugString()))
ReplicateMsg* replicate = replicate_entry->mutable_replicate();
const CommitMsg& commit = commit_entry->commit();
OperationType op_type = commit.op_type();
switch (op_type) {
case WRITE_OP:
RETURN_NOT_OK_REPLAY(PlayWriteRequest, replicate, commit);
RETURN_NOT_OK_REPLAY(PlayAlterSchemaRequest, replicate, commit);
RETURN_NOT_OK_REPLAY(PlayChangeConfigRequest, replicate, commit);
case NO_OP:
RETURN_NOT_OK_REPLAY(PlayNoOpRequest, replicate, commit);
return Status::IllegalState(Substitute("Unsupported commit entry type: $0",
// Non-tablet operations should not advance the safe time, because they are
// not started serially and so may have timestamps that are out of order.
if (op_type == NO_OP || op_type == CHANGE_CONFIG_OP) {
return Status::OK();
// Handle safe time advancement:
// If this operation has an external consistency mode other than COMMIT_WAIT, we know that no
// future transaction will have a timestamp that is lower than it, so we can just advance the
// safe timestamp to this operation's timestamp.
// If the hybrid clock is disabled, all transactions will fall into this category.
Timestamp safe_time;
if (replicate->write_request().external_consistency_mode() != COMMIT_WAIT) {
safe_time = Timestamp(replicate->timestamp());
// ... else we set the safe timestamp to be the transaction's timestamp minus the maximum clock
// error. This opens the door for problems if the flags changed across reboots, but this is
// unlikely and the problem would manifest itself immediately and clearly (mvcc would complain
// the operation is already committed, with a CHECK failure).
} else {
DCHECK(clock_->SupportsExternalConsistencyMode(COMMIT_WAIT)) << "The provided clock does not"
"support COMMIT_WAIT external consistency mode.";
safe_time = server::HybridClock::AddPhysicalTimeToTimestamp(
return Status::OK();
void TabletBootstrap::DumpReplayStateToLog(const ReplayState& state) {
// Dump the replay state, this will log the pending replicates as well as the pending commits,
// which might be useful for debugging.
vector<string> state_dump;
for (const string& string : state_dump) {
Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) {
ReplayState state;
log::SegmentSequence segments;
// The first thing to do is to rewind the tablet's schema back to the schema
// as of the point in time where the logs begin. We must replay the writes
// in the logs with the correct point-in-time schema.
if (!segments.empty()) {
const scoped_refptr<ReadableLogSegment>& segment = segments[0];
// Set the point-in-time schema for the tablet based on the log header.
Schema pit_schema;
RETURN_NOT_OK_PREPEND(SchemaFromPB(segment->header().schema(), &pit_schema),
"Couldn't decode log segment schema");
pit_schema, segment->header().schema_version()),
"couldn't set point-in-time schema");
// We defer opening the log until here, so that we properly reproduce the
// point-in-time schema from the log we're reading into the log we're
// writing.
RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log");
int segment_count = 0;
for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
log::LogEntryReader reader(segment.get());
int entry_count = 0;
while (true) {
unique_ptr<LogEntryPB> entry(new LogEntryPB);
Status s = reader.ReadNextEntry(entry.get());
if (PREDICT_FALSE(!s.ok())) {
if (s.IsEndOfFile()) {
return Status::Corruption(Substitute("Error reading Log Segment of tablet $0: $1 "
"(Read up to entry $2 of segment $3, in path $4)",
s = HandleEntry(&state, entry.get());
if (!s.ok()) {
RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(),
entry_count, segment->path(),
// If HandleEntry returns OK, then it has taken ownership of the entry.
// TODO: could be more granular here and log during the segments as well,
// plus give info about number of MB processed, but this is better than
// nothing.
listener_->StatusMessage(Substitute("Bootstrap replayed $0/$1 log segments. "
"Stats: $2. Pending: $3 replicates",
segment_count + 1, log_reader_->num_segments(),
// If we have non-applied commits they all must belong to pending operations and
// they should only pertain to stores which are still active.
if (!state.pending_commits.empty()) {
for (const OpIndexToEntryMap::value_type& entry : state.pending_commits) {
if (!ContainsKey(state.pending_replicates, entry.first)) {
return Status::Corruption("Had orphaned commits at the end of replay.");
if (entry.second->commit().op_type() == WRITE_OP &&
AnalyzeActiveStores(entry.second->commit()) == NO_STORES_ACTIVE) {
TabletSuperBlockPB super;
WARN_NOT_OK(meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");
return Status::Corruption(Substitute("CommitMsg was pending but it did not refer "
"to any active memory stores. Commit: $0. TabletMetadata: $1",
entry.second->commit().ShortDebugString(), super.ShortDebugString()));
// Note that we don't pass the information contained in the pending commits along with
// ConsensusBootstrapInfo. We know that this is safe as they must refer to active
// stores (we make doubly sure above).
// Example/Explanation:
// Say we have two different operations that touch the same row, one insert and one
// mutate. Since we use Early Lock Release the commit for the second (mutate) operation
// might end up in the log before the insert's commit. This wouldn't matter since
// we replay in order, but a corner case here is that we might crash before we
// write the commit for the insert, meaning it might not be present at all.
// One possible log for this situation would be:
// - Replicate 10.10 (insert)
// - Replicate 10.11 (mutate)
// - Commit 10.11 (mutate)
// ~CRASH while Commit 10.10 is in-flight~
// We can't replay 10.10 during bootstrap because we haven't seen its commit, but
// since we can't replay out-of-order we won't replay 10.11 either, in fact we'll
// pass them both as "pending" to consensus to be applied again.
// The reason why it is safe to simply disregard 10.11's commit is that we know that
// it must refer only to active stores. We know this because one important flush/compact
// pre-condition is:
// - No flush will become visible on reboot (meaning we won't durably update the tablet
// metadata), unless the snapshot under which the flush/compact was performed has no
// in-flight transactions and all the messages that are in-flight to the log are durable.
// In our example this means that if we had flushed/compacted after 10.10 was applied
// (meaning losing the commit message would lead to corruption as we might re-apply it)
// then the commit for 10.10 would be durable. Since it isn't then no flush/compaction
// occurred after 10.10 was applied and thus we can disregard the commit message for
// 10.11 and simply apply both 10.10 and 10.11 as if we hadn't applied them before.
// This generalizes to:
// - If a committed replicate message with index Y is missing a commit message,
// no later committed replicate message (with index > Y) is visible across reboots
// in the tablet data.
// Set up the ConsensusBootstrapInfo structure for the caller.
for (OpIndexToEntryMap::value_type& e : state.pending_replicates) {
consensus_info->last_id = state.prev_op_id;
consensus_info->last_committed_id = state.committed_op_id;
return Status::OK();
Status TabletBootstrap::AppendCommitMsg(const CommitMsg& commit_msg) {
LogEntryPB commit_entry;
CommitMsg* commit = commit_entry.mutable_commit();
return log_->Append(&commit_entry);
Status TabletBootstrap::DetermineFlushedOps(const TxResultPB& result,
vector<bool>* flushed_by_op) {
int num_ops = result.ops_size();
for (int i = 0; i < num_ops; i++) {
const auto& orig_op_result = result.ops(i);
bool f;
RETURN_NOT_OK(FilterOperation(orig_op_result, &f));
(*flushed_by_op)[i] = f;
return Status::OK();
Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
// Prepare the commit entry for the rewritten log.
LogEntryPB commit_entry;
CommitMsg* commit = commit_entry.mutable_commit();
// Set up the new transaction.
// Even if we're going to ignore the transaction, it's important to
// do this so that MVCC advances.
WriteRequestPB* write = replicate_msg->mutable_write_request();
WriteTransactionState tx_state(nullptr, write, nullptr);
// Determine which of the operations are already flushed to persistent
// storage and don't need to be re-applied. We can do this even before
// we decode any row operations, so we can short-circuit that decoding
// in the case that the entire op has been already flushed.
vector<bool> already_flushed;
RETURN_NOT_OK(DetermineFlushedOps(commit_msg.result(), &already_flushed));
bool all_already_flushed = std::all_of(already_flushed.begin(),
[](bool f) { return f; });
if (all_already_flushed) {
for (auto& op : *commit->mutable_result()->mutable_ops()) {
} else {
if (write->has_row_operations()) {
// TODO: get rid of redundant params below - they can be gotten from the Request
// Replace the original commit message's result with the new one from
// the replayed operation.
return Status::OK();
Status TabletBootstrap::PlayAlterSchemaRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request();
// Decode schema
Schema schema;
RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema));
AlterSchemaTransactionState tx_state(nullptr, alter_schema, nullptr);
// TODO(KUDU-860): we should somehow distinguish if an alter table failed on its original
// attempt (e.g due to being an invalid request, or a request with a too-early
// schema version).
RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&tx_state, &schema));
// Apply the alter schema to the tablet
RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&tx_state), "Failed to AlterSchema:");
// Also update the log information. Normally, the AlterSchema() call above
// takes care of this, but our new log isn't hooked up to the tablet yet.
log_->SetSchemaForNextLogSegment(schema, tx_state.schema_version());
return AppendCommitMsg(commit_msg);
Status TabletBootstrap::PlayChangeConfigRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
ChangeConfigRecordPB* change_config = replicate_msg->mutable_change_config_record();
RaftConfigPB config = change_config->new_config();
int64_t cmeta_opid_index = cmeta_->committed_config().opid_index();
if (replicate_msg->id().index() > cmeta_opid_index) {
VLOG_WITH_PREFIX(1) << "WAL replay found Raft configuration with log index "
<< config.opid_index()
<< " that is greater than the committed config's index "
<< cmeta_opid_index
<< ". Applying this configuration change.";
// We flush once at the end of bootstrap.
} else {
VLOG_WITH_PREFIX(1) << "WAL replay found Raft configuration with log index "
<< replicate_msg->id().index()
<< ", which is less than or equal to the committed "
<< "config's index " << cmeta_opid_index << ". "
<< "Skipping application of this config change.";
return AppendCommitMsg(commit_msg);
Status TabletBootstrap::PlayNoOpRequest(ReplicateMsg* replicate_msg, const CommitMsg& commit_msg) {
return AppendCommitMsg(commit_msg);
Status TabletBootstrap::PlayRowOperations(WriteTransactionState* tx_state,
const SchemaPB& schema_pb,
const RowOperationsPB& ops_pb,
const TxResultPB& result,
const vector<bool>& already_flushed) {
Schema inserts_schema;
RETURN_NOT_OK_PREPEND(SchemaFromPB(schema_pb, &inserts_schema),
"Couldn't decode client schema");
RETURN_NOT_OK_PREPEND(tablet_->DecodeWriteOperations(&inserts_schema, tx_state),
Substitute("Could not decode row operations: $0",
DCHECK_EQ(tx_state->row_ops().size(), already_flushed.size());
// Propagate the 'already_flushed' information into the decoded operations.
// This signals to ApplyOperations() below that it doesn't need to actually
// apply these ops again.
for (int i = 0; i < already_flushed.size(); i++) {
if (already_flushed[i]) {
// Run AcquireRowLocks, Apply, etc!
"Failed to acquire row locks");
RETURN_NOT_OK(ApplyOperations(tx_state, result));
return Status::OK();
Status TabletBootstrap::ApplyOperations(WriteTransactionState* tx_state,
const TxResultPB& orig_result) {
int32_t op_idx = 0;
for (RowOp* op : tx_state->row_ops()) {
const OperationResultPB& orig_op_result = orig_result.ops(op_idx++);
// Increment the seen/ignored stats.
switch (op->decoded_op.type) {
case RowOperationsPB::INSERT:
case RowOperationsPB::UPSERT: {
// TODO: should we have a separate counter for upserts?
if (op->has_result()) {
case RowOperationsPB::UPDATE:
case RowOperationsPB::DELETE: {
if (op->has_result()) {
LOG_WITH_PREFIX(FATAL) << "Bad op type: " << op->decoded_op.type;
// If the op is already flushed, no need to replay it.
if (op->has_result()) {
// check if the operation failed in the original transaction
if (PREDICT_FALSE(orig_op_result.has_failed_status())) {
Status status = StatusFromPB(orig_op_result.failed_status());
if (VLOG_IS_ON(1)) {
VLOG_WITH_PREFIX(1) << "Skipping operation that originally resulted in error. OpId: "
<< tx_state->op_id().DebugString() << " op index: "
<< op_idx - 1 << " original error: "
<< status.ToString();
// Actually apply it.
ProbeStats stats; // we don't use this, but tablet internals require non-NULL.
tablet_->ApplyRowOperation(tx_state, op, &stats);
DCHECK(op->result != nullptr);
// We expect that the above Apply() will always succeed, because we're
// applying an operation that we know succeeded before the server
// restarted. If it doesn't succeed, something is wrong and we are
// diverging from our prior state, so bail.
if (op->result->has_failed_status()) {
return Status::Corruption("Operation which previously succeeded failed "
"during log replay",
Substitute("Op: $0\nFailure: $1",
return Status::OK();
Status TabletBootstrap::FilterOperation(const OperationResultPB& op_result,
bool* already_flushed) {
// If the operation failed or was skipped, originally, no need to re-apply it.
if (op_result.has_failed_status() || op_result.flushed()) {
*already_flushed = true;
return Status::OK();
int num_mutated_stores = op_result.mutated_stores_size();
if (PREDICT_FALSE(num_mutated_stores == 0 || num_mutated_stores > 2)) {
return Status::Corruption(Substitute("All operations must have one or two mutated_stores: $0",
// The mutation may have been duplicated, so we'll check whether any of the
// output targets was active.
int num_active_stores = 0;
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
if (num_active_stores == 0) {
// The mutation was fully flushed.
*already_flushed = true;
return Status::OK();
if (PREDICT_FALSE(num_active_stores == 2)) {
// It's not possible for a duplicated mutation to refer to two stores which are still
// active. Either the mutation arrived before the metadata was flushed, in which case
// the 'first' store is live, or it arrived just after it was flushed, in which case
// the 'second' store was live. But at no time should the metadata refer to both the
// 'input' and 'output' stores of a compaction.
return Status::Corruption("Mutation was duplicated to two stores that are considered live",
*already_flushed = false;
return Status::OK();
Status TabletBootstrap::UpdateClock(uint64_t timestamp) {
Timestamp ts;
return Status::OK();
string TabletBootstrap::LogPrefix() const {
return Substitute("T $0 P $1: ", meta_->tablet_id(), meta_->fs_manager()->uuid());
Status FlushedStoresSnapshot::InitFrom(const TabletMetadata& meta) {
CHECK(flushed_dms_by_drs_id_.empty()) << "already initted";
last_durable_mrs_id_ = meta.last_durable_mrs_id();
for (const shared_ptr<RowSetMetadata>& rsmd : meta.rowsets()) {
if (!InsertIfNotPresent(&flushed_dms_by_drs_id_, rsmd->id(),
rsmd->last_durable_redo_dms_id())) {
return Status::Corruption(Substitute(
"Duplicate DRS ID $0 in tablet metadata. "
"Found DRS $0 with last durable redo DMS ID $1 while trying to "
"initialize DRS $0 with last durable redo DMS ID $2",
return Status::OK();
bool FlushedStoresSnapshot::IsMemStoreActive(const MemStoreTargetPB& target) const {
if (target.has_mrs_id()) {
// The original mutation went to the MRS. If this MRS has not yet been made
// durable, it needs to be replayed.
return target.mrs_id() > last_durable_mrs_id_;
} else {
// The original mutation went to a DRS's delta store.
int64_t last_durable_dms_id;
if (!FindCopy(flushed_dms_by_drs_id_, target.rs_id(), &last_durable_dms_id)) {
// If we have no data about this DRS, then there are two cases:
// 1) The DRS has already been flushed, but then later got removed because
// it got compacted away. Since it was flushed, we don't need to replay it.
// 2) The DRS was in the process of being written, but haven't yet flushed the
// TabletMetadata update that includes it. We only write to an in-progress DRS like
// this when we are in the 'duplicating' phase of a compaction. In that case,
// the other duplicated 'target' should still be present in the metadata, and we
// can base our decision based on that one.
return false;
// If the original rowset that we applied the edit to exists, check whether
// the edit was in a flushed DMS or a live one.
return target.dms_id() > last_durable_dms_id;
} // namespace tablet
} // namespace kudu