blob: 3cd578d01c0fcc9150f2b4e3788f097f5b51e825 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_bootstrap.h"
#include <gflags/gflags.h>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/gutil/walltime.h"
#include "kudu/server/clock.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/server/metadata.h"
#include "kudu/tablet/lock_manager.h"
#include "kudu/tablet/row_op.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tablet/transactions/alter_schema_transaction.h"
#include "kudu/tablet/transactions/write_transaction.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/path_util.h"
#include "kudu/util/stopwatch.h"
DEFINE_bool(skip_remove_old_recovery_dir, false,
"Skip removing WAL recovery dir after startup. (useful for debugging)");
TAG_FLAG(skip_remove_old_recovery_dir, hidden);
DEFINE_double(fault_crash_during_log_replay, 0.0,
"Fraction of the time when the tablet will crash immediately "
"after processing a log entry during log replay. "
"(For testing only!)");
TAG_FLAG(fault_crash_during_log_replay, unsafe);
DECLARE_int32(max_clock_sync_error_usec);
namespace kudu {
namespace tablet {
using boost::shared_lock;
using consensus::ALTER_SCHEMA_OP;
using consensus::CHANGE_CONFIG_OP;
using consensus::ChangeConfigRecordPB;
using consensus::CommitMsg;
using consensus::ConsensusBootstrapInfo;
using consensus::ConsensusMetadata;
using consensus::ConsensusRound;
using consensus::MinimumOpId;
using consensus::NO_OP;
using consensus::OperationType;
using consensus::OperationType_Name;
using consensus::OpId;
using consensus::OpIdEquals;
using consensus::OpIdEqualsFunctor;
using consensus::OpIdHashFunctor;
using consensus::OpIdToString;
using consensus::RaftConfigPB;
using consensus::ReplicateMsg;
using consensus::WRITE_OP;
using log::Log;
using log::LogAnchorRegistry;
using log::LogEntryPB;
using log::LogOptions;
using log::LogReader;
using log::ReadableLogSegment;
using server::Clock;
using std::map;
using std::shared_ptr;
using std::string;
using std::unordered_map;
using strings::Substitute;
using tserver::AlterSchemaRequestPB;
using tserver::WriteRequestPB;
struct ReplayState;
// Information from the tablet metadata which indicates which data was
// flushed prior to this restart and which memory stores are still active.
//
// We take a snapshot of this information at the beginning of the bootstrap
// process so that we can allow compactions and flushes to run during bootstrap
// without confusing our tracking of flushed stores.
//
// NOTE: automatic flushes and compactions are not currently scheduled during
// bootstrap. However, flushes may still be triggered due to operations like
// alter-table.
class FlushedStoresSnapshot {
public:
FlushedStoresSnapshot() {}
Status InitFrom(const TabletMetadata& meta);
// Return true if the given memory store is still active (i.e. edits that were
// originally written to this memory store should be replayed during the bootstrap
// process).
//
// NOTE: a store may be inactive for either of two reasons. Either:
// (a) the store was flushed to disk, OR
// (b) the store was in the process of being written by a flush or compaction
// but the process crashed before the associated tablet metadata update
// was committed.
bool IsMemStoreActive(const MemStoreTargetPB& target) const;
private:
int64_t last_durable_mrs_id_;
unordered_map<int64_t, int64_t> flushed_dms_by_drs_id_;
DISALLOW_COPY_AND_ASSIGN(FlushedStoresSnapshot);
};
// Bootstraps an existing tablet by opening the metadata from disk, and rebuilding soft
// state by playing log segments. A bootstrapped tablet can then be added to an existing
// consensus configuration as a LEARNER, which will bring its state up to date with the
// rest of the consensus configuration, or it can start serving the data itself, after it
// has been appointed LEADER of that particular consensus configuration.
//
// NOTE: this does not handle pulling data from other replicas in the cluster. That
// is handled by the 'RemoteBootstrap' classes, which copy blocks and metadata locally
// before invoking this local bootstrap functionality.
//
// TODO Because the table that is being rebuilt is never flushed/compacted, consensus
// is only set on the tablet after bootstrap, when we get to flushes/compactions though
// we need to set it before replay or we won't be able to re-rebuild.
class TabletBootstrap {
public:
TabletBootstrap(const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock,
shared_ptr<MemTracker> mem_tracker,
MetricRegistry* metric_registry,
TabletStatusListener* listener,
const scoped_refptr<LogAnchorRegistry>& log_anchor_registry);
// Plays the log segments, rebuilding the portion of the Tablet's soft
// state that is present in the log (additional soft state may be present
// in other replicas).
// A successful call will yield the rebuilt tablet and the rebuilt log.
Status Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* results);
private:
// Opens the tablet.
// Sets '*has_blocks' to true if there was any data on disk for this tablet.
Status OpenTablet(bool* has_blocks);
// Checks if a previous log recovery directory exists. If so, it deletes any
// files in the log dir and sets 'needs_recovery' to true, meaning that the
// previous recovery attempt should be retried from the recovery dir.
//
// Otherwise, if there is a log directory with log files in it, renames that
// log dir to the log recovery dir and creates a new, empty log dir so that
// log replay can proceed. 'needs_recovery' is also returned as true in this
// case.
//
// If no log segments are found, 'needs_recovery' is set to false.
Status PrepareRecoveryDir(bool* needs_recovery);
// Opens the latest log segments for the Tablet that will allow to rebuild
// the tablet's soft state. If there are existing log segments in the tablet's
// log directly they are moved to a "log-recovery" directory which is deleted
// when the replay process is completed (as they have been duplicated in the
// current log directory).
//
// If a "log-recovery" directory is already present, we will continue to replay
// from the "log-recovery" directory. Tablet metadata is updated once replay
// has finished from the "log-recovery" directory.
Status OpenLogReaderInRecoveryDir();
// Opens a new log in the tablet's log directory.
// The directory is expected to be clean.
Status OpenNewLog();
// Finishes bootstrap, setting 'rebuilt_log' and 'rebuilt_tablet'.
Status FinishBootstrap(const string& message,
scoped_refptr<log::Log>* rebuilt_log,
shared_ptr<Tablet>* rebuilt_tablet);
// Plays the log segments into the tablet being built.
// The process of playing the segments generates a new log that can be continued
// later on when then tablet is rebuilt and starts accepting writes from clients.
Status PlaySegments(ConsensusBootstrapInfo* results);
// Append the given commit message to the log.
// Does not support writing a TxResult.
Status AppendCommitMsg(const CommitMsg& commit_msg);
Status PlayWriteRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayAlterSchemaRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayChangeConfigRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayNoOpRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
// Plays operations, skipping those that have already been flushed.
Status PlayRowOperations(WriteTransactionState* tx_state,
const SchemaPB& schema_pb,
const RowOperationsPB& ops_pb,
const TxResultPB& result);
// Pass through all of the decoded operations in tx_state. For
// each op:
// - if it was previously failed, mark as failed
// - if it previously succeeded but was flushed, mark as skipped
// - otherwise, re-apply to the tablet being bootstrapped.
Status FilterAndApplyOperations(WriteTransactionState* tx_state,
const TxResultPB& orig_result);
// Filter a single insert operation, setting it to failed if
// it was already flushed.
Status FilterInsert(WriteTransactionState* tx_state,
RowOp* op,
const OperationResultPB& op_result);
// Filter a single mutate operation, setting it to failed if
// it was already flushed.
Status FilterMutate(WriteTransactionState* tx_state,
RowOp* op,
const OperationResultPB& op_result);
// Returns true if any of the memory stores referenced in 'commit' are still
// active, in which case the operation needs to be replayed.
bool AreAnyStoresActive(const CommitMsg& commit);
void DumpReplayStateToLog(const ReplayState& state);
// Handlers for each type of message seen in the log during replay.
Status HandleEntry(ReplayState* state, LogEntryPB* entry);
Status HandleReplicateMessage(ReplayState* state, LogEntryPB* replicate_entry);
Status HandleCommitMessage(ReplayState* state, LogEntryPB* commit_entry);
Status ApplyCommitMessage(ReplayState* state, LogEntryPB* commit_entry);
Status HandleEntryPair(LogEntryPB* replicate_entry, LogEntryPB* commit_entry);
// Checks that an orphaned commit message is actually irrelevant, i.e that none
// of the data stores it refers to are live.
Status CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit);
// Decodes a Timestamp from the provided string and updates the clock
// with it.
Status UpdateClock(uint64_t timestamp);
// Removes the recovery directory and all files contained therein.
// Intended to be invoked after log replay successfully completes.
Status RemoveRecoveryDir();
// Return a log prefix string in the standard "T xxx P yyy" format.
string LogPrefix() const;
scoped_refptr<TabletMetadata> meta_;
scoped_refptr<Clock> clock_;
shared_ptr<MemTracker> mem_tracker_;
MetricRegistry* metric_registry_;
TabletStatusListener* listener_;
gscoped_ptr<tablet::Tablet> tablet_;
const scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
scoped_refptr<log::Log> log_;
std::shared_ptr<log::LogReader> log_reader_;
gscoped_ptr<ConsensusMetadata> cmeta_;
// Statistics on the replay of entries in the log.
struct Stats {
Stats()
: ops_read(0),
ops_overwritten(0),
ops_committed(0),
inserts_seen(0),
inserts_ignored(0),
mutations_seen(0),
mutations_ignored(0),
orphaned_commits(0) {
}
string ToString() const {
return Substitute("ops{read=$0 overwritten=$1 applied=$2} "
"inserts{seen=$3 ignored=$4} "
"mutations{seen=$5 ignored=$6} "
"orphaned_commits=$7",
ops_read, ops_overwritten, ops_committed,
inserts_seen, inserts_ignored,
mutations_seen, mutations_ignored,
orphaned_commits);
}
// Number of REPLICATE messages read from the log
int ops_read;
// Number of REPLICATE messages which were overwritten by later entries.
int ops_overwritten;
// Number of REPLICATE messages for which a matching COMMIT was found.
int ops_committed;
// Number inserts/mutations seen and ignored.
int inserts_seen, inserts_ignored;
int mutations_seen, mutations_ignored;
// Number of COMMIT messages for which a corresponding REPLICATE was not found.
int orphaned_commits;
};
Stats stats_;
// Snapshot of which stores were flushed prior to restart.
FlushedStoresSnapshot flushed_stores_;
DISALLOW_COPY_AND_ASSIGN(TabletBootstrap);
};
TabletStatusListener::TabletStatusListener(const scoped_refptr<TabletMetadata>& meta)
: meta_(meta),
last_status_("") {
}
const string TabletStatusListener::tablet_id() const {
return meta_->tablet_id();
}
const string TabletStatusListener::table_name() const {
return meta_->table_name();
}
const Partition& TabletStatusListener::partition() const {
return meta_->partition();
}
const Schema& TabletStatusListener::schema() const {
return meta_->schema();
}
TabletStatusListener::~TabletStatusListener() {
}
void TabletStatusListener::StatusMessage(const string& status) {
LOG(INFO) << "T " << tablet_id() << " P " << meta_->fs_manager()->uuid() << ": "
<< status;
boost::lock_guard<boost::shared_mutex> l(lock_);
last_status_ = status;
}
Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock,
const shared_ptr<MemTracker>& mem_tracker,
MetricRegistry* metric_registry,
TabletStatusListener* listener,
shared_ptr<tablet::Tablet>* rebuilt_tablet,
scoped_refptr<log::Log>* rebuilt_log,
const scoped_refptr<log::LogAnchorRegistry>& log_anchor_registry,
ConsensusBootstrapInfo* consensus_info) {
TRACE_EVENT1("tablet", "BootstrapTablet",
"tablet_id", meta->tablet_id());
TabletBootstrap bootstrap(meta, clock, mem_tracker,
metric_registry, listener, log_anchor_registry);
RETURN_NOT_OK(bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, consensus_info));
// This is necessary since OpenNewLog() initially disables sync.
RETURN_NOT_OK((*rebuilt_log)->ReEnableSyncIfRequired());
return Status::OK();
}
static string DebugInfo(const string& tablet_id,
int segment_seqno,
int entry_idx,
const string& segment_path,
const LogEntryPB& entry) {
// Truncate the debug string to a reasonable length for logging.
// Otherwise, glog will truncate for us and we may miss important
// information which came after this long string.
string debug_str = entry.ShortDebugString();
if (debug_str.size() > 500) {
debug_str.resize(500);
debug_str.append("...");
}
return Substitute("Debug Info: Error playing entry $0 of segment $1 of tablet $2. "
"Segment path: $3. Entry: $4", entry_idx, segment_seqno, tablet_id,
segment_path, debug_str);
}
TabletBootstrap::TabletBootstrap(
const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock, shared_ptr<MemTracker> mem_tracker,
MetricRegistry* metric_registry, TabletStatusListener* listener,
const scoped_refptr<LogAnchorRegistry>& log_anchor_registry)
: meta_(meta),
clock_(clock),
mem_tracker_(std::move(mem_tracker)),
metric_registry_(metric_registry),
listener_(listener),
log_anchor_registry_(log_anchor_registry) {}
Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* consensus_info) {
string tablet_id = meta_->tablet_id();
// Replay requires a valid Consensus metadata file to exist in order to
// compare the committed consensus configuration seqno with the log entries and also to persist
// committed but unpersisted changes.
RETURN_NOT_OK_PREPEND(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id,
meta_->fs_manager()->uuid(), &cmeta_),
"Unable to load Consensus metadata");
// Make sure we don't try to locally bootstrap a tablet that was in the middle
// of a remote bootstrap. It's likely that not all files were copied over
// successfully.
TabletDataState tablet_data_state = meta_->tablet_data_state();
if (tablet_data_state != TABLET_DATA_READY) {
return Status::Corruption("Unable to locally bootstrap tablet " + tablet_id + ": " +
"TabletMetadata bootstrap state is " +
TabletDataState_Name(tablet_data_state));
}
meta_->PinFlush();
listener_->StatusMessage("Bootstrap starting.");
if (VLOG_IS_ON(1)) {
TabletSuperBlockPB super_block;
RETURN_NOT_OK(meta_->ToSuperBlock(&super_block));
VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << super_block.DebugString();
}
RETURN_NOT_OK(flushed_stores_.InitFrom(*meta_.get()));
bool has_blocks;
RETURN_NOT_OK(OpenTablet(&has_blocks));
bool needs_recovery;
RETURN_NOT_OK(PrepareRecoveryDir(&needs_recovery));
if (needs_recovery) {
RETURN_NOT_OK(OpenLogReaderInRecoveryDir());
}
// This is a new tablet, nothing left to do.
if (!has_blocks && !needs_recovery) {
LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log.";
RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log");
RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log",
rebuilt_log,
rebuilt_tablet));
consensus_info->last_id = MinimumOpId();
consensus_info->last_committed_id = MinimumOpId();
return Status::OK();
}
// If there were blocks, there must be segments to replay. This is required
// by Raft, since we always need to know the term and index of the last
// logged op in order to vote, know how to respond to AppendEntries(), etc.
if (has_blocks && !needs_recovery) {
return Status::IllegalState(Substitute("Tablet $0: Found rowsets but no log "
"segments could be found.",
tablet_id));
}
// Before playing any segments we set the safe and clean times to 'kMin' so that
// the MvccManager will accept all transactions that we replay as uncommitted.
tablet_->mvcc_manager()->OfflineAdjustSafeTime(Timestamp::kMin);
RETURN_NOT_OK_PREPEND(PlaySegments(consensus_info), "Failed log replay. Reason");
// Flush the consensus metadata once at the end to persist our changes, if any.
cmeta_->Flush();
RETURN_NOT_OK(RemoveRecoveryDir());
RETURN_NOT_OK(FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet));
return Status::OK();
}
Status TabletBootstrap::FinishBootstrap(const string& message,
scoped_refptr<log::Log>* rebuilt_log,
shared_ptr<Tablet>* rebuilt_tablet) {
// Add a callback to TabletMetadata that makes sure that each time we flush the metadata
// we also wait for in-flights to finish and for their wal entry to be fsynced.
// This might be a bit conservative in some situations but it will prevent us from
// ever flushing the metadata referring to tablet data blocks containing data whose
// commit entries are not durable, a pre-requisite for recovery.
meta_->SetPreFlushCallback(
Bind(&FlushInflightsToLogCallback::WaitForInflightsAndFlushLog,
make_scoped_refptr(new FlushInflightsToLogCallback(tablet_.get(),
log_))));
tablet_->MarkFinishedBootstrapping();
RETURN_NOT_OK(tablet_->metadata()->UnPinFlush());
listener_->StatusMessage(message);
rebuilt_tablet->reset(tablet_.release());
rebuilt_log->swap(log_);
return Status::OK();
}
Status TabletBootstrap::OpenTablet(bool* has_blocks) {
gscoped_ptr<Tablet> tablet(new Tablet(meta_,
clock_,
mem_tracker_,
metric_registry_,
log_anchor_registry_));
// doing nothing for now except opening a tablet locally.
LOG_TIMING_PREFIX(INFO, LogPrefix(), "opening tablet") {
RETURN_NOT_OK(tablet->Open());
}
*has_blocks = tablet->num_rowsets() != 0;
tablet_.reset(tablet.release());
return Status::OK();
}
Status TabletBootstrap::PrepareRecoveryDir(bool* needs_recovery) {
*needs_recovery = false;
FsManager* fs_manager = tablet_->metadata()->fs_manager();
string tablet_id = tablet_->metadata()->tablet_id();
string log_dir = fs_manager->GetTabletWalDir(tablet_id);
// If the recovery directory exists, then we crashed mid-recovery.
// Throw away any logs from the previous recovery attempt and restart the log
// replay process from the beginning using the same recovery dir as last time.
string recovery_path = fs_manager->GetTabletWalRecoveryDir(tablet_id);
if (fs_manager->Exists(recovery_path)) {
LOG_WITH_PREFIX(INFO) << "Previous recovery directory found at " << recovery_path << ": "
<< "Replaying log files from this location instead of " << log_dir;
// Since we have a recovery directory, clear out the log_dir by recursively
// deleting it and creating a new one so that we don't end up with remnants
// of old WAL segments or indexes after replay.
if (fs_manager->env()->FileExists(log_dir)) {
LOG_WITH_PREFIX(INFO) << "Deleting old log files from previous recovery attempt in "
<< log_dir;
RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteRecursively(log_dir),
"Could not recursively delete old log dir " + log_dir);
}
RETURN_NOT_OK_PREPEND(fs_manager->CreateDirIfMissing(log_dir),
"Failed to create log directory " + log_dir);
*needs_recovery = true;
return Status::OK();
}
// If we made it here, there was no pre-existing recovery dir.
// Now we look for log files in log_dir, and if we find any then we rename
// the whole log_dir to a recovery dir and return needs_recovery = true.
RETURN_NOT_OK_PREPEND(fs_manager->CreateDirIfMissing(log_dir),
"Failed to create log dir");
vector<string> children;
RETURN_NOT_OK_PREPEND(fs_manager->ListDir(log_dir, &children),
"Couldn't list log segments.");
for (const string& child : children) {
if (!log::IsLogFileName(child)) {
continue;
}
string source_path = JoinPathSegments(log_dir, child);
string dest_path = JoinPathSegments(recovery_path, child);
LOG_WITH_PREFIX(INFO) << "Will attempt to recover log segment " << source_path
<< " to " << dest_path;
*needs_recovery = true;
}
if (*needs_recovery) {
// Atomically rename the log directory to the recovery directory
// and then re-create the log directory.
LOG_WITH_PREFIX(INFO) << "Moving log directory " << log_dir << " to recovery directory "
<< recovery_path << " in preparation for log replay";
RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(log_dir, recovery_path),
Substitute("Could not move log directory $0 to recovery dir $1",
log_dir, recovery_path));
RETURN_NOT_OK_PREPEND(fs_manager->env()->CreateDir(log_dir),
"Failed to recreate log directory " + log_dir);
}
return Status::OK();
}
Status TabletBootstrap::OpenLogReaderInRecoveryDir() {
VLOG_WITH_PREFIX(1) << "Opening log reader in log recovery dir "
<< meta_->fs_manager()->GetTabletWalRecoveryDir(tablet_->tablet_id());
// Open the reader.
RETURN_NOT_OK_PREPEND(LogReader::OpenFromRecoveryDir(tablet_->metadata()->fs_manager(),
tablet_->metadata()->tablet_id(),
tablet_->GetMetricEntity().get(),
&log_reader_),
"Could not open LogReader. Reason");
return Status::OK();
}
Status TabletBootstrap::RemoveRecoveryDir() {
FsManager* fs_manager = tablet_->metadata()->fs_manager();
string recovery_path = fs_manager->GetTabletWalRecoveryDir(tablet_->metadata()->tablet_id());
CHECK(fs_manager->Exists(recovery_path))
<< "Tablet WAL recovery dir " << recovery_path << " does not exist.";
LOG_WITH_PREFIX(INFO) << "Preparing to delete log recovery files and directory " << recovery_path;
string tmp_path = Substitute("$0-$1", recovery_path, GetCurrentTimeMicros());
LOG_WITH_PREFIX(INFO) << "Renaming log recovery dir from " << recovery_path
<< " to " << tmp_path;
RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(recovery_path, tmp_path),
Substitute("Could not rename old recovery dir from: $0 to: $1",
recovery_path, tmp_path));
if (FLAGS_skip_remove_old_recovery_dir) {
LOG_WITH_PREFIX(INFO) << "--skip_remove_old_recovery_dir enabled. NOT deleting " << tmp_path;
return Status::OK();
}
LOG_WITH_PREFIX(INFO) << "Deleting all files from renamed log recovery directory " << tmp_path;
RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteRecursively(tmp_path),
"Could not remove renamed recovery dir " + tmp_path);
LOG_WITH_PREFIX(INFO) << "Completed deletion of old log recovery files and directory "
<< tmp_path;
return Status::OK();
}
Status TabletBootstrap::OpenNewLog() {
OpId init;
init.set_term(0);
init.set_index(0);
RETURN_NOT_OK(Log::Open(LogOptions(),
tablet_->metadata()->fs_manager(),
tablet_->tablet_id(),
*tablet_->schema(),
tablet_->metadata()->schema_version(),
tablet_->GetMetricEntity(),
&log_));
// Disable sync temporarily in order to speed up appends during the
// bootstrap process.
log_->DisableSync();
return Status::OK();
}
typedef map<int64_t, LogEntryPB*> OpIndexToEntryMap;
// State kept during replay.
struct ReplayState {
ReplayState()
: prev_op_id(MinimumOpId()),
committed_op_id(MinimumOpId()) {
}
~ReplayState() {
STLDeleteValues(&pending_replicates);
STLDeleteValues(&pending_commits);
}
// Return true if 'b' is allowed to immediately follow 'a' in the log.
static bool IsValidSequence(const OpId& a, const OpId& b) {
if (a.term() == 0 && a.index() == 0) {
// Not initialized - can start with any opid.
return true;
}
// Within the same term, we should never skip entries.
// We can, however go backwards (see KUDU-783 for an example)
if (b.term() == a.term() &&
b.index() > a.index() + 1) {
return false;
}
return true;
}
// Return a Corruption status if 'id' seems to be out-of-sequence in the log.
Status CheckSequentialReplicateId(const ReplicateMsg& msg) {
DCHECK(msg.has_id());
if (PREDICT_FALSE(!IsValidSequence(prev_op_id, msg.id()))) {
string op_desc = Substitute("$0 REPLICATE (Type: $1)",
OpIdToString(msg.id()),
OperationType_Name(msg.op_type()));
return Status::Corruption(
Substitute("Unexpected opid following opid $0. Operation: $1",
OpIdToString(prev_op_id),
op_desc));
}
prev_op_id = msg.id();
return Status::OK();
}
void UpdateCommittedOpId(const OpId& id) {
if (id.index() > committed_op_id.index()) {
committed_op_id = id;
}
}
void AddEntriesToStrings(const OpIndexToEntryMap& entries, vector<string>* strings) const {
for (const OpIndexToEntryMap::value_type& map_entry : entries) {
LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second);
strings->push_back(Substitute(" $0", entry->ShortDebugString()));
}
}
void DumpReplayStateToStrings(vector<string>* strings) const {
strings->push_back(Substitute("ReplayState: Previous OpId: $0, Committed OpId: $1, "
"Pending Replicates: $2, Pending Commits: $3", OpIdToString(prev_op_id),
OpIdToString(committed_op_id), pending_replicates.size(), pending_commits.size()));
if (!pending_replicates.empty()) {
strings->push_back("Dumping REPLICATES: ");
AddEntriesToStrings(pending_replicates, strings);
}
if (!pending_commits.empty()) {
strings->push_back("Dumping COMMITS: ");
AddEntriesToStrings(pending_commits, strings);
}
}
// The last replicate message's ID.
OpId prev_op_id;
// The last operation known to be committed.
// All other operations with lower IDs are also committed.
OpId committed_op_id;
// REPLICATE log entries whose corresponding COMMIT record has
// not yet been seen. Keyed by index.
OpIndexToEntryMap pending_replicates;
// COMMIT log entries which couldn't be applied immediately.
OpIndexToEntryMap pending_commits;
};
// Handle the given log entry. If OK is returned, then takes ownership of 'entry'.
// Otherwise, caller frees.
Status TabletBootstrap::HandleEntry(ReplayState* state, LogEntryPB* entry) {
if (VLOG_IS_ON(1)) {
VLOG_WITH_PREFIX(1) << "Handling entry: " << entry->ShortDebugString();
}
switch (entry->type()) {
case log::REPLICATE:
RETURN_NOT_OK(HandleReplicateMessage(state, entry));
break;
case log::COMMIT:
// check the unpaired ops for the matching replicate msg, abort if not found
RETURN_NOT_OK(HandleCommitMessage(state, entry));
break;
default:
return Status::Corruption(Substitute("Unexpected log entry type: $0", entry->type()));
}
MAYBE_FAULT(FLAGS_fault_crash_during_log_replay);
return Status::OK();
}
// Takes ownership of 'replicate_entry' on OK status.
Status TabletBootstrap::HandleReplicateMessage(ReplayState* state, LogEntryPB* replicate_entry) {
stats_.ops_read++;
const ReplicateMsg& replicate = replicate_entry->replicate();
RETURN_NOT_OK(state->CheckSequentialReplicateId(replicate));
DCHECK(replicate.has_timestamp());
CHECK_OK(UpdateClock(replicate.timestamp()));
// Append the replicate message to the log as is
RETURN_NOT_OK(log_->Append(replicate_entry));
int64_t index = replicate_entry->replicate().id().index();
LogEntryPB** existing_entry_ptr = InsertOrReturnExisting(
&state->pending_replicates, index, replicate_entry);
// If there was a entry with the same index we're overwriting then we need to delete
// that entry and all entries with higher indexes.
if (existing_entry_ptr) {
LogEntryPB* existing_entry = *existing_entry_ptr;
auto iter = state->pending_replicates.lower_bound(index);
DCHECK(OpIdEquals((*iter).second->replicate().id(), existing_entry->replicate().id()));
LogEntryPB* last_entry = (*state->pending_replicates.rbegin()).second;
LOG_WITH_PREFIX(INFO) << "Overwriting operations starting at: "
<< existing_entry->replicate().id()
<< " up to: " << last_entry->replicate().id()
<< " with operation: " << replicate_entry->replicate().id();
while (iter != state->pending_replicates.end()) {
delete (*iter).second;
state->pending_replicates.erase(iter++);
stats_.ops_overwritten++;
}
InsertOrDie(&state->pending_replicates, index, replicate_entry);
}
return Status::OK();
}
// Takes ownership of 'commit_entry' on OK status.
Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB* commit_entry) {
DCHECK(commit_entry->has_commit()) << "Not a commit message: " << commit_entry->DebugString();
// Match up the COMMIT record with the original entry that it's applied to.
const OpId& committed_op_id = commit_entry->commit().commited_op_id();
state->UpdateCommittedOpId(committed_op_id);
// If there are no pending replicates, or if this commit's index is lower than the
// the first pending replicate on record this is likely an orphaned commit.
if (state->pending_replicates.empty() ||
(*state->pending_replicates.begin()).first > committed_op_id.index()) {
VLOG_WITH_PREFIX(2) << "Found orphaned commit for " << committed_op_id;
RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(commit_entry->commit()));
stats_.orphaned_commits++;
delete commit_entry;
return Status::OK();
}
// If this commit does not correspond to the first replicate message in the pending
// replicates set we keep it to apply later...
if ((*state->pending_replicates.begin()).first != committed_op_id.index()) {
if (!ContainsKey(state->pending_replicates, committed_op_id.index())) {
return Status::Corruption(Substitute("Could not find replicate for commit: $0",
commit_entry->ShortDebugString()));
}
VLOG_WITH_PREFIX(2) << "Adding pending commit for " << committed_op_id;
InsertOrDie(&state->pending_commits, committed_op_id.index(), commit_entry);
return Status::OK();
}
// ... if it does, we apply it and all the commits that immediately follow in the sequence.
OpId last_applied = commit_entry->commit().commited_op_id();
RETURN_NOT_OK(ApplyCommitMessage(state, commit_entry));
delete commit_entry;
auto iter = state->pending_commits.begin();
while (iter != state->pending_commits.end()) {
if ((*iter).first == last_applied.index() + 1) {
gscoped_ptr<LogEntryPB> buffered_commit_entry((*iter).second);
state->pending_commits.erase(iter++);
last_applied = buffered_commit_entry->commit().commited_op_id();
RETURN_NOT_OK(ApplyCommitMessage(state, buffered_commit_entry.get()));
continue;
}
break;
}
return Status::OK();
}
bool TabletBootstrap::AreAnyStoresActive(const CommitMsg& commit) {
for (const OperationResultPB& op_result : commit.result().ops()) {
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
return true;
}
}
}
return false;
}
Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit) {
if (AreAnyStoresActive(commit)) {
TabletSuperBlockPB super;
WARN_NOT_OK(meta_->ToSuperBlock(&super), LogPrefix() + "Couldn't build TabletSuperBlockPB");
return Status::Corruption(Substitute("CommitMsg was orphaned but it referred to "
"stores which need replay. Commit: $0. TabletMetadata: $1", commit.ShortDebugString(),
super.ShortDebugString()));
}
return Status::OK();
}
Status TabletBootstrap::ApplyCommitMessage(ReplayState* state, LogEntryPB* commit_entry) {
const OpId& committed_op_id = commit_entry->commit().commited_op_id();
VLOG_WITH_PREFIX(2) << "Applying commit for " << committed_op_id;
gscoped_ptr<LogEntryPB> pending_replicate_entry;
// They should also have an associated replicate index (it may have been in a
// deleted log segment though).
pending_replicate_entry.reset(EraseKeyReturnValuePtr(&state->pending_replicates,
committed_op_id.index()));
if (pending_replicate_entry != nullptr) {
// We found a replicate with the same index, make sure it also has the same
// term.
if (!OpIdEquals(committed_op_id, pending_replicate_entry->replicate().id())) {
string error_msg = Substitute("Committed operation's OpId: $0 didn't match the"
"commit message's committed OpId: $1. Pending operation: $2, Commit message: $3",
pending_replicate_entry->replicate().id().ShortDebugString(),
committed_op_id.ShortDebugString(),
pending_replicate_entry->replicate().ShortDebugString(),
commit_entry->commit().ShortDebugString());
LOG_WITH_PREFIX(DFATAL) << error_msg;
return Status::Corruption(error_msg);
}
RETURN_NOT_OK(HandleEntryPair(pending_replicate_entry.get(), commit_entry));
stats_.ops_committed++;
} else {
stats_.orphaned_commits++;
RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(commit_entry->commit()));
}
return Status::OK();
}
// Never deletes 'replicate_entry' or 'commit_entry'.
Status TabletBootstrap::HandleEntryPair(LogEntryPB* replicate_entry, LogEntryPB* commit_entry) {
const char* error_fmt = "Failed to play $0 request. ReplicateMsg: { $1 }, CommitMsg: { $2 }";
#define RETURN_NOT_OK_REPLAY(ReplayMethodName, replicate, commit) \
RETURN_NOT_OK_PREPEND(ReplayMethodName(replicate, commit), \
Substitute(error_fmt, OperationType_Name(op_type), \
replicate->ShortDebugString(), commit.ShortDebugString()))
ReplicateMsg* replicate = replicate_entry->mutable_replicate();
const CommitMsg& commit = commit_entry->commit();
OperationType op_type = commit.op_type();
switch (op_type) {
case WRITE_OP:
RETURN_NOT_OK_REPLAY(PlayWriteRequest, replicate, commit);
break;
case ALTER_SCHEMA_OP:
RETURN_NOT_OK_REPLAY(PlayAlterSchemaRequest, replicate, commit);
break;
case CHANGE_CONFIG_OP:
RETURN_NOT_OK_REPLAY(PlayChangeConfigRequest, replicate, commit);
break;
case NO_OP:
RETURN_NOT_OK_REPLAY(PlayNoOpRequest, replicate, commit);
break;
default:
return Status::IllegalState(Substitute("Unsupported commit entry type: $0",
commit.op_type()));
}
#undef RETURN_NOT_OK_REPLAY
// Non-tablet operations should not advance the safe time, because they are
// not started serially and so may have timestamps that are out of order.
if (op_type == NO_OP || op_type == CHANGE_CONFIG_OP) {
return Status::OK();
}
// Handle safe time advancement:
//
// If this operation has an external consistency mode other than COMMIT_WAIT, we know that no
// future transaction will have a timestamp that is lower than it, so we can just advance the
// safe timestamp to this operation's timestamp.
//
// If the hybrid clock is disabled, all transactions will fall into this category.
Timestamp safe_time;
if (replicate->write_request().external_consistency_mode() != COMMIT_WAIT) {
safe_time = Timestamp(replicate->timestamp());
// ... else we set the safe timestamp to be the transaction's timestamp minus the maximum clock
// error. This opens the door for problems if the flags changed across reboots, but this is
// unlikely and the problem would manifest itself immediately and clearly (mvcc would complain
// the operation is already committed, with a CHECK failure).
} else {
DCHECK(clock_->SupportsExternalConsistencyMode(COMMIT_WAIT)) << "The provided clock does not"
"support COMMIT_WAIT external consistency mode.";
safe_time = server::HybridClock::AddPhysicalTimeToTimestamp(
Timestamp(replicate->timestamp()),
MonoDelta::FromMicroseconds(-FLAGS_max_clock_sync_error_usec));
}
tablet_->mvcc_manager()->OfflineAdjustSafeTime(safe_time);
return Status::OK();
}
void TabletBootstrap::DumpReplayStateToLog(const ReplayState& state) {
// Dump the replay state, this will log the pending replicates as well as the pending commits,
// which might be useful for debugging.
vector<string> state_dump;
state.DumpReplayStateToStrings(&state_dump);
for (const string& string : state_dump) {
LOG_WITH_PREFIX(INFO) << string;
}
}
Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) {
ReplayState state;
log::SegmentSequence segments;
RETURN_NOT_OK(log_reader_->GetSegmentsSnapshot(&segments));
// The first thing to do is to rewind the tablet's schema back to the schema
// as of the point in time where the logs begin. We must replay the writes
// in the logs with the correct point-in-time schema.
if (!segments.empty()) {
const scoped_refptr<ReadableLogSegment>& segment = segments[0];
// Set the point-in-time schema for the tablet based on the log header.
Schema pit_schema;
RETURN_NOT_OK_PREPEND(SchemaFromPB(segment->header().schema(), &pit_schema),
"Couldn't decode log segment schema");
RETURN_NOT_OK_PREPEND(tablet_->RewindSchemaForBootstrap(
pit_schema, segment->header().schema_version()),
"couldn't set point-in-time schema");
}
// We defer opening the log until here, so that we properly reproduce the
// point-in-time schema from the log we're reading into the log we're
// writing.
RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log");
int segment_count = 0;
for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
vector<LogEntryPB*> entries;
ElementDeleter deleter(&entries);
// TODO: Optimize this to not read the whole thing into memory?
Status read_status = segment->ReadEntries(&entries);
for (int entry_idx = 0; entry_idx < entries.size(); ++entry_idx) {
LogEntryPB* entry = entries[entry_idx];
Status s = HandleEntry(&state, entry);
if (!s.ok()) {
DumpReplayStateToLog(state);
RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(),
segment->header().sequence_number(),
entry_idx, segment->path(),
*entry));
}
// If HandleEntry returns OK, then it has taken ownership of the entry.
// So, we have to remove it from the entries vector to avoid it getting
// freed by ElementDeleter.
entries[entry_idx] = nullptr;
}
// If the LogReader failed to read for some reason, we'll still try to
// replay as many entries as possible, and then fail with Corruption.
// TODO: this is sort of scary -- why doesn't LogReader expose an
// entry-by-entry iterator-like API instead? Seems better to avoid
// exposing the idea of segments to callers.
if (PREDICT_FALSE(!read_status.ok())) {
return Status::Corruption(Substitute("Error reading Log Segment of tablet $0: $1 "
"(Read up to entry $2 of segment $3, in path $4)",
tablet_->tablet_id(),
read_status.ToString(),
entries.size(),
segment->header().sequence_number(),
segment->path()));
}
// TODO: could be more granular here and log during the segments as well,
// plus give info about number of MB processed, but this is better than
// nothing.
listener_->StatusMessage(Substitute("Bootstrap replayed $0/$1 log segments. "
"Stats: $2. Pending: $3 replicates",
segment_count + 1, log_reader_->num_segments(),
stats_.ToString(),
state.pending_replicates.size()));
segment_count++;
}
// If we have non-applied commits they all must belong to pending operations and
// they should only pertain to stores which are still active.
if (!state.pending_commits.empty()) {
for (const OpIndexToEntryMap::value_type& entry : state.pending_commits) {
if (!ContainsKey(state.pending_replicates, entry.first)) {
DumpReplayStateToLog(state);
return Status::Corruption("Had orphaned commits at the end of replay.");
}
if (entry.second->commit().op_type() == WRITE_OP &&
!AreAnyStoresActive(entry.second->commit())) {
DumpReplayStateToLog(state);
TabletSuperBlockPB super;
WARN_NOT_OK(meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");
return Status::Corruption(Substitute("CommitMsg was pending but it did not refer "
"to any active memory stores. Commit: $0. TabletMetadata: $1",
entry.second->commit().ShortDebugString(), super.ShortDebugString()));
}
}
}
// Note that we don't pass the information contained in the pending commits along with
// ConsensusBootstrapInfo. We know that this is safe as they must refer to active
// stores (we make doubly sure above).
//
// Example/Explanation:
// Say we have two different operations that touch the same row, one insert and one
// mutate. Since we use Early Lock Release the commit for the second (mutate) operation
// might end up in the log before the insert's commit. This wouldn't matter since
// we replay in order, but a corner case here is that we might crash before we
// write the commit for the insert, meaning it might not be present at all.
//
// One possible log for this situation would be:
// - Replicate 10.10 (insert)
// - Replicate 10.11 (mutate)
// - Commit 10.11 (mutate)
// ~CRASH while Commit 10.10 is in-flight~
//
// We can't replay 10.10 during bootstrap because we haven't seen its commit, but
// since we can't replay out-of-order we won't replay 10.11 either, in fact we'll
// pass them both as "pending" to consensus to be applied again.
//
// The reason why it is safe to simply disregard 10.11's commit is that we know that
// it must refer only to active stores. We know this because one important flush/compact
// pre-condition is:
// - No flush will become visible on reboot (meaning we won't durably update the tablet
// metadata), unless the snapshot under which the flush/compact was performed has no
// in-flight transactions and all the messages that are in-flight to the log are durable.
//
// In our example this means that if we had flushed/compacted after 10.10 was applied
// (meaning losing the commit message would lead to corruption as we might re-apply it)
// then the commit for 10.10 would be durable. Since it isn't then no flush/compaction
// occurred after 10.10 was applied and thus we can disregard the commit message for
// 10.11 and simply apply both 10.10 and 10.11 as if we hadn't applied them before.
//
// This generalizes to:
// - If a committed replicate message with index Y is missing a commit message,
// no later committed replicate message (with index > Y) is visible across reboots
// in the tablet data.
DumpReplayStateToLog(state);
// Set up the ConsensusBootstrapInfo structure for the caller.
for (OpIndexToEntryMap::value_type& e : state.pending_replicates) {
consensus_info->orphaned_replicates.push_back(e.second->release_replicate());
}
consensus_info->last_id = state.prev_op_id;
consensus_info->last_committed_id = state.committed_op_id;
return Status::OK();
}
Status TabletBootstrap::AppendCommitMsg(const CommitMsg& commit_msg) {
LogEntryPB commit_entry;
commit_entry.set_type(log::COMMIT);
CommitMsg* commit = commit_entry.mutable_commit();
commit->CopyFrom(commit_msg);
return log_->Append(&commit_entry);
}
Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
DCHECK(replicate_msg->has_timestamp());
WriteRequestPB* write = replicate_msg->mutable_write_request();
WriteTransactionState tx_state(nullptr, write, nullptr);
tx_state.mutable_op_id()->CopyFrom(replicate_msg->id());
tx_state.set_timestamp(Timestamp(replicate_msg->timestamp()));
tablet_->StartTransaction(&tx_state);
tablet_->StartApplying(&tx_state);
// Use committed OpId for mem store anchoring.
tx_state.mutable_op_id()->CopyFrom(replicate_msg->id());
if (write->has_row_operations()) {
// TODO: get rid of redundant params below - they can be gotten from the Request
RETURN_NOT_OK(PlayRowOperations(&tx_state,
write->schema(),
write->row_operations(),
commit_msg.result()));
}
// Append the commit msg to the log but replace the result with the new one.
LogEntryPB commit_entry;
commit_entry.set_type(log::COMMIT);
CommitMsg* commit = commit_entry.mutable_commit();
commit->CopyFrom(commit_msg);
tx_state.ReleaseTxResultPB(commit->mutable_result());
RETURN_NOT_OK(log_->Append(&commit_entry));
return Status::OK();
}
Status TabletBootstrap::PlayAlterSchemaRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request();
// Decode schema
Schema schema;
RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema));
AlterSchemaTransactionState tx_state(nullptr, alter_schema, nullptr);
// TODO(KUDU-860): we should somehow distinguish if an alter table failed on its original
// attempt (e.g due to being an invalid request, or a request with a too-early
// schema version).
RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&tx_state, &schema));
// Apply the alter schema to the tablet
RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&tx_state), "Failed to AlterSchema:");
// Also update the log information. Normally, the AlterSchema() call above
// takes care of this, but our new log isn't hooked up to the tablet yet.
log_->SetSchemaForNextLogSegment(schema, tx_state.schema_version());
return AppendCommitMsg(commit_msg);
}
Status TabletBootstrap::PlayChangeConfigRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
ChangeConfigRecordPB* change_config = replicate_msg->mutable_change_config_record();
RaftConfigPB config = change_config->new_config();
int64_t cmeta_opid_index = cmeta_->committed_config().opid_index();
if (replicate_msg->id().index() > cmeta_opid_index) {
DCHECK(!config.has_opid_index());
config.set_opid_index(replicate_msg->id().index());
VLOG_WITH_PREFIX(1) << "WAL replay found Raft configuration with log index "
<< config.opid_index()
<< " that is greater than the committed config's index "
<< cmeta_opid_index
<< ". Applying this configuration change.";
cmeta_->set_committed_config(config);
// We flush once at the end of bootstrap.
} else {
VLOG_WITH_PREFIX(1) << "WAL replay found Raft configuration with log index "
<< replicate_msg->id().index()
<< ", which is less than or equal to the committed "
<< "config's index " << cmeta_opid_index << ". "
<< "Skipping application of this config change.";
}
return AppendCommitMsg(commit_msg);
}
Status TabletBootstrap::PlayNoOpRequest(ReplicateMsg* replicate_msg, const CommitMsg& commit_msg) {
return AppendCommitMsg(commit_msg);
}
Status TabletBootstrap::PlayRowOperations(WriteTransactionState* tx_state,
const SchemaPB& schema_pb,
const RowOperationsPB& ops_pb,
const TxResultPB& result) {
Schema inserts_schema;
RETURN_NOT_OK_PREPEND(SchemaFromPB(schema_pb, &inserts_schema),
"Couldn't decode client schema");
RETURN_NOT_OK_PREPEND(tablet_->DecodeWriteOperations(&inserts_schema, tx_state),
Substitute("Could not decode row operations: $0",
ops_pb.ShortDebugString()));
CHECK_EQ(tx_state->row_ops().size(), result.ops_size());
// Run AcquireRowLocks, Apply, etc!
RETURN_NOT_OK_PREPEND(tablet_->AcquireRowLocks(tx_state),
"Failed to acquire row locks");
RETURN_NOT_OK(FilterAndApplyOperations(tx_state, result));
return Status::OK();
}
Status TabletBootstrap::FilterAndApplyOperations(WriteTransactionState* tx_state,
const TxResultPB& orig_result) {
int32_t op_idx = 0;
for (RowOp* op : tx_state->row_ops()) {
const OperationResultPB& orig_op_result = orig_result.ops(op_idx++);
op->set_original_result_from_log(&orig_op_result);
// check if the operation failed in the original transaction
if (PREDICT_FALSE(orig_op_result.has_failed_status())) {
Status status = StatusFromPB(orig_op_result.failed_status());
if (VLOG_IS_ON(1)) {
VLOG_WITH_PREFIX(1) << "Skipping operation that originally resulted in error. OpId: "
<< tx_state->op_id().DebugString() << " op index: "
<< op_idx - 1 << " original error: "
<< status.ToString();
}
op->SetFailed(status);
continue;
}
// Check if it should be filtered out because it's already flushed.
switch (op->decoded_op.type) {
case RowOperationsPB::INSERT:
stats_.inserts_seen++;
if (!orig_op_result.flushed()) {
RETURN_NOT_OK(FilterInsert(tx_state, op, orig_op_result));
} else {
op->SetAlreadyFlushed();
stats_.inserts_ignored++;
continue;
}
break;
case RowOperationsPB::UPDATE:
case RowOperationsPB::DELETE:
stats_.mutations_seen++;
if (!orig_op_result.flushed()) {
RETURN_NOT_OK(FilterMutate(tx_state, op, orig_op_result));
} else {
op->SetAlreadyFlushed();
stats_.mutations_ignored++;
continue;
}
break;
default:
LOG_WITH_PREFIX(FATAL) << "Bad op type: " << op->decoded_op.type;
break;
}
if (op->result != nullptr) {
continue;
}
// Actually apply it.
ProbeStats stats; // we don't use this, but tablet internals require non-NULL.
tablet_->ApplyRowOperation(tx_state, op, &stats);
DCHECK(op->result != nullptr);
// We expect that the above Apply() will always succeed, because we're
// applying an operation that we know succeeded before the server
// restarted. If it doesn't succeed, something is wrong and we are
// diverging from our prior state, so bail.
if (op->result->has_failed_status()) {
return Status::Corruption("Operation which previously succeeded failed "
"during log replay",
Substitute("Op: $0\nFailure: $1",
op->ToString(*tablet_->schema()),
op->result->failed_status().ShortDebugString()));
}
}
return Status::OK();
}
Status TabletBootstrap::FilterInsert(WriteTransactionState* tx_state,
RowOp* op,
const OperationResultPB& op_result) {
DCHECK_EQ(op->decoded_op.type, RowOperationsPB::INSERT);
// INSERTs are never duplicated.
if (PREDICT_FALSE(op_result.mutated_stores_size() != 1 ||
!op_result.mutated_stores(0).has_mrs_id())) {
return Status::Corruption(Substitute("Insert operation result must have an mrs_id: $0",
op_result.ShortDebugString()));
}
// check if the insert is already flushed
if (!flushed_stores_.IsMemStoreActive(op_result.mutated_stores(0))) {
if (VLOG_IS_ON(1)) {
VLOG_WITH_PREFIX(1) << "Skipping insert that was already flushed. OpId: "
<< tx_state->op_id().DebugString()
<< " flushed to: " << op_result.mutated_stores(0).mrs_id()
<< " latest durable mrs id: "
<< tablet_->metadata()->last_durable_mrs_id();
}
op->SetAlreadyFlushed();
stats_.inserts_ignored++;
}
return Status::OK();
}
Status TabletBootstrap::FilterMutate(WriteTransactionState* tx_state,
RowOp* op,
const OperationResultPB& op_result) {
DCHECK(op->decoded_op.type == RowOperationsPB::UPDATE ||
op->decoded_op.type == RowOperationsPB::DELETE)
<< RowOperationsPB::Type_Name(op->decoded_op.type);
int num_mutated_stores = op_result.mutated_stores_size();
if (PREDICT_FALSE(num_mutated_stores == 0 || num_mutated_stores > 2)) {
return Status::Corruption(Substitute("Mutations must have one or two mutated_stores: $0",
op_result.ShortDebugString()));
}
// The mutation may have been duplicated, so we'll check whether any of the
// output targets was active.
int num_active_stores = 0;
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
num_active_stores++;
} else {
if (VLOG_IS_ON(1)) {
string mutation = op->decoded_op.changelist.ToString(*tablet_->schema());
VLOG_WITH_PREFIX(1) << "Skipping mutation to " << mutated_store.ShortDebugString()
<< " that was not active. OpId: " << tx_state->op_id().DebugString();
}
}
}
if (num_active_stores == 0) {
// The mutation was fully flushed.
op->SetFailed(Status::AlreadyPresent("Update was already flushed."));
stats_.mutations_ignored++;
return Status::OK();
}
if (PREDICT_FALSE(num_active_stores == 2)) {
// It's not possible for a duplicated mutation to refer to two stores which are still
// active. Either the mutation arrived before the metadata was flushed, in which case
// the 'first' store is live, or it arrived just after it was flushed, in which case
// the 'second' store was live. But at no time should the metadata refer to both the
// 'input' and 'output' stores of a compaction.
return Status::Corruption("Mutation was duplicated to two stores that are considered live",
op_result.ShortDebugString());
}
return Status::OK();
}
Status TabletBootstrap::UpdateClock(uint64_t timestamp) {
Timestamp ts;
RETURN_NOT_OK(ts.FromUint64(timestamp));
RETURN_NOT_OK(clock_->Update(ts));
return Status::OK();
}
string TabletBootstrap::LogPrefix() const {
return Substitute("T $0 P $1: ", meta_->tablet_id(), meta_->fs_manager()->uuid());
}
Status FlushedStoresSnapshot::InitFrom(const TabletMetadata& meta) {
CHECK(flushed_dms_by_drs_id_.empty()) << "already initted";
last_durable_mrs_id_ = meta.last_durable_mrs_id();
for (const shared_ptr<RowSetMetadata>& rsmd : meta.rowsets()) {
if (!InsertIfNotPresent(&flushed_dms_by_drs_id_, rsmd->id(),
rsmd->last_durable_redo_dms_id())) {
return Status::Corruption(Substitute(
"Duplicate DRS ID $0 in tablet metadata. "
"Found DRS $0 with last durable redo DMS ID $1 while trying to "
"initialize DRS $0 with last durable redo DMS ID $2",
rsmd->id(),
flushed_dms_by_drs_id_[rsmd->id()],
rsmd->last_durable_redo_dms_id()));
}
}
return Status::OK();
}
bool FlushedStoresSnapshot::IsMemStoreActive(const MemStoreTargetPB& target) const {
if (target.has_mrs_id()) {
DCHECK(!target.has_rs_id());
DCHECK(!target.has_dms_id());
// The original mutation went to the MRS. If this MRS has not yet been made
// durable, it needs to be replayed.
return target.mrs_id() > last_durable_mrs_id_;
} else {
// The original mutation went to a DRS's delta store.
DCHECK(target.has_rs_id());
int64_t last_durable_dms_id;
if (!FindCopy(flushed_dms_by_drs_id_, target.rs_id(), &last_durable_dms_id)) {
// If we have no data about this DRS, then there are two cases:
//
// 1) The DRS has already been flushed, but then later got removed because
// it got compacted away. Since it was flushed, we don't need to replay it.
//
// 2) The DRS was in the process of being written, but haven't yet flushed the
// TabletMetadata update that includes it. We only write to an in-progress DRS like
// this when we are in the 'duplicating' phase of a compaction. In that case,
// the other duplicated 'target' should still be present in the metadata, and we
// can base our decision based on that one.
return false;
}
// If the original rowset that we applied the edit to exists, check whether
// the edit was in a flushed DMS or a live one.
return target.dms_id() > last_durable_dms_id;
}
}
} // namespace tablet
} // namespace kudu