blob: 298c9626deeae33dda24be8fff525c92df584e9e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_bootstrap.h"
#include <cstdint>
#include <functional>
#include <iterator>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/arena.h>
#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/data_dirs.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/io_context.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/ops/alter_schema_op.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/participant_op.h"
#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/row_op.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/rowset_metadata.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/stopwatch.h"
DECLARE_int32(group_commit_queue_size_bytes);
DEFINE_double(fault_crash_during_log_replay, 0.0,
"Fraction of the time when the tablet will crash immediately "
"after processing a log entry during log replay. "
"(For testing only!)");
TAG_FLAG(fault_crash_during_log_replay, unsafe);
DECLARE_int32(max_clock_sync_error_usec);
using kudu::clock::Clock;
using kudu::consensus::ALTER_SCHEMA_OP;
using kudu::consensus::CHANGE_CONFIG_OP;
using kudu::consensus::CommitMsg;
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::consensus::MinimumOpId;
using kudu::consensus::NO_OP;
using kudu::consensus::OpId;
using kudu::consensus::OpIdEquals;
using kudu::consensus::OpIdToString;
using kudu::consensus::OperationType;
using kudu::consensus::OperationType_Name;
using kudu::consensus::PARTICIPANT_OP;
using kudu::consensus::RaftConfigPB;
using kudu::consensus::ReplicateMsg;
using kudu::consensus::WRITE_OP;
using kudu::fs::IOContext;
using kudu::log::Log;
using kudu::log::LogAnchorRegistry;
using kudu::log::LogEntryPB;
using kudu::log::LogIndex;
using kudu::log::LogOptions;
using kudu::log::LogReader;
using kudu::log::ReadableLogSegment;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::ResultTracker;
using kudu::tserver::AlterSchemaRequestPB;
using kudu::tserver::WriteRequestPB;
using kudu::tserver::WriteResponsePB;
using std::map;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace tablet {
struct ReplayState;
// Information from the tablet metadata which indicates which data was
// flushed prior to this restart and which memory stores are still active.
//
// We take a snapshot of this information at the beginning of the bootstrap
// process so that we can allow compactions and flushes to run during bootstrap
// without confusing our tracking of flushed stores.
//
// NOTE: automatic flushes and compactions are not currently scheduled during
// bootstrap. However, flushes may still be triggered due to operations like
// alter-table.
class FlushedStoresSnapshot {
public:
FlushedStoresSnapshot() {}
Status InitFrom(const TabletMetadata& tablet_meta);
// Return true if the given memory store is still active (i.e. edits that were
// originally written to this memory store should be replayed during the bootstrap
// process).
//
// NOTE: a store may be inactive for either of two reasons. Either:
// (a) the store was flushed to disk, OR
// (b) the store was in the process of being written by a flush or compaction
// but the process crashed before the associated tablet metadata update
// was committed.
bool IsMemStoreActive(const MemStoreTargetPB& target) const;
private:
int64_t last_durable_mrs_id_;
unordered_map<int64_t, int64_t> flushed_dms_by_drs_id_;
DISALLOW_COPY_AND_ASSIGN(FlushedStoresSnapshot);
};
// Bootstraps an existing tablet by opening the metadata from disk, and rebuilding soft
// state by playing log segments. A bootstrapped tablet can then be added to an existing
// consensus configuration as a LEARNER, which will bring its state up to date with the
// rest of the consensus configuration, or it can start serving the data itself, after it
// has been appointed LEADER of that particular consensus configuration.
//
// The high-level steps to replay the WAL are as follows:
// for each segment in the WAL:
// for each entry in the segment:
// - If the entry is a replicate message, keep track of it and write it to
// the new WAL, since we may find a corresponding commit message later.
// - If the entry is a commit message, determine whether or not we have a
// corresponding replicate message in the WAL, and if so, replay the op,
// skipping operations whose mem-stores have been persisted to disk. If
// no replicate message is found, we can skip replaying the op since its
// mutation has been persisted to disk.
// for each commit message with no corresponding replicate message:
// - Validate that the mutated stores are non-active.
// for each replicate message with no corresponding commit message:
// - Return the replicate message as an "orphaned replicate".
// - When the RaftConsensus instance starts up, orphaned replicates are
// initialized as follower ops.
//
// NOTE: this does not handle pulling data from other replicas in the cluster. That
// is handled by the 'TabletCopy' classes, which copy blocks and metadata locally
// before invoking this local bootstrap functionality to start the tablet.
//
// TODO Because the table that is being rebuilt is never flushed/compacted, consensus
// is only set on the tablet after bootstrap, when we get to flushes/compactions though
// we need to set it before replay or we won't be able to re-rebuild.
class TabletBootstrap {
public:
TabletBootstrap(scoped_refptr<TabletMetadata> tablet_meta,
RaftConfigPB committed_raft_config,
Clock* clock,
shared_ptr<MemTracker> mem_tracker,
scoped_refptr<ResultTracker> result_tracker,
MetricRegistry* metric_registry,
FileCache* file_cache,
scoped_refptr<TabletReplica> tablet_replica,
scoped_refptr<LogAnchorRegistry> log_anchor_registry);
// Plays the log segments, rebuilding the portion of the Tablet's soft
// state that is present in the log (additional soft state may be present
// in other replicas).
// A successful call will yield the rebuilt tablet and the rebuilt log.
Status Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* consensus_info);
private:
// The method that does the actual work of tablet bootstrap. Bootstrap() is
// actually a wrapper method that is responsible for pinning and unpinning
// the tablet metadata flush.
Status RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* consensus_info);
// Opens the tablet.
// Sets '*has_blocks' to true if there was any data on disk for this tablet.
Status OpenTablet(bool* has_blocks);
// Checks if a previous log recovery directory exists. If so, it deletes any
// files in the log dir and sets 'needs_recovery' to true, meaning that the
// previous recovery attempt should be retried from the recovery dir.
//
// Otherwise, if there is a log directory with log files in it, renames that
// log dir to the log recovery dir and creates a new, empty log dir so that
// log replay can proceed. 'needs_recovery' is also returned as true in this
// case.
//
// If no log segments are found, 'needs_recovery' is set to false.
Status PrepareRecoveryDir(bool* needs_recovery);
// Opens the latest log segments for the Tablet that will allow to rebuild
// the tablet's soft state. If there are existing log segments in the tablet's
// log directly they are moved to a "log-recovery" directory which is deleted
// when the replay process is completed (as they have been duplicated in the
// current log directory).
//
// If a "log-recovery" directory is already present, we will continue to replay
// from the "log-recovery" directory. Tablet metadata is updated once replay
// has finished from the "log-recovery" directory.
Status OpenLogReaderInRecoveryDir();
// Opens a new log in the tablet's log directory.
// The directory is expected to be clean.
Status OpenNewLog();
// Finishes bootstrap, setting 'rebuilt_log' and 'rebuilt_tablet'.
Status FinishBootstrap(const string& message,
scoped_refptr<log::Log>* rebuilt_log,
shared_ptr<Tablet>* rebuilt_tablet);
// Plays the log segments into the tablet being built.
// The process of playing the segments generates a new log that can be continued
// later on when then tablet is rebuilt and starts accepting writes from clients.
Status PlaySegments(const IOContext* io_context, ConsensusBootstrapInfo* consensus_info);
// Append the given commit message to the log.
// Does not support writing a TxResult.
Status AppendCommitMsg(const CommitMsg& commit_msg);
Status PlayWriteRequest(const IOContext* io_context, ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayAlterSchemaRequest(const IOContext* io_context, ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayChangeConfigRequest(const IOContext* io_context, ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayTxnParticipantOpRequest(const IOContext* io_context, ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
Status PlayNoOpRequest(const IOContext* io_context, ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);
// Plays operations, skipping those that have already been flushed or have previously failed.
// See ApplyRowOperations() for more details on how the decision of whether an operation
// is applied or skipped is made.
Status PlayRowOperations(const IOContext* io_context,
WriteOpState* op_state,
const TxResultPB& orig_result,
TxResultPB* new_result);
// Determine which of the operations from 'orig_result' must be skipped.
// At the same time this builds the WriteResponsePB that we'll store on the ResultTracker.
// 'new_result' store the results of the operations that were skipped, 'response' stores
// any error that might have previously happened so that we can send them back to clients,
// if needed.
// Finally 'all_skipped' indicates whether all of the original operations were skipped.
Status DetermineSkippedOpsAndBuildResponse(const TxResultPB& orig_result,
TxResultPB* new_result,
WriteResponsePB* response,
bool* all_skipped);
// Pass through all of the decoded operations in op_state. For each op:
// - if it was previously failed, mark as failed
// - if it previously succeeded but was flushed, skip it.
// - otherwise, re-apply to the tablet being bootstrapped.
Status ApplyOperations(const IOContext* io_context,
WriteOpState* op_state,
const TxResultPB& orig_result,
TxResultPB* new_result);
enum OpAction {
// The operation was never applied or was applied to an unflushed memory store and thus
// needs to be applied again.
NEEDS_REPLAY,
// The operation was already applied to a memory store that was flushed.
SKIP_PREVIOUSLY_FLUSHED,
// The operation was never applied due to an error.
SKIP_PREVIOUSLY_FAILED
};
// Filter a row operation, setting 'action' to indicate what needs to be done
// to the operation, i.e. whether it must applied or skipped.
Status FilterOperation(const OperationResultPB& op_result,
OpAction* action);
enum ActiveStores {
// The OperationResultPBs in the commit message do not reference any stores.
// This can happen in the case that the operations did not result in any mutations
// (e.g. because they were updates for not-found row keys).
NO_MUTATED_STORES,
// At least one operation resulted in a mutation to a store, but none of the
// mutated stores are still active. Therefore the operation does not need to
// be replayed.
NO_STORES_ACTIVE,
// At least one operation resulted in a mutation to a store, and at least
// one of those mutated stores is still active. This implies that the operation
// needs to be replayed.
SOME_STORES_ACTIVE
};
// For the given commit message, analyze which memory stores were mutated
// by the operation, returning one of the enum values above.
ActiveStores AnalyzeActiveStores(const CommitMsg& commit);
void DumpReplayStateToLog(const ReplayState& state);
Status HandleEntry(const IOContext* io_context,
ReplayState* state,
unique_ptr<LogEntryPB> entry,
string* entry_debug_info);
// Handlers for each type of message seen in the log during replay.
Status HandleReplicateMessage(ReplayState* state,
unique_ptr<LogEntryPB> entry,
string* entry_debug_info);
Status HandleCommitMessage(const IOContext* io_context, ReplayState* state,
unique_ptr<LogEntryPB> entry,
string* entry_debug_info);
Status ApplyCommitMessage(const IOContext* io_context, ReplayState* state, LogEntryPB* entry);
Status HandleEntryPair(const IOContext* io_context, LogEntryPB* replicate_entry,
LogEntryPB* commit_entry);
// Checks that an orphaned commit message is actually irrelevant, i.e that none
// of the data stores it refers to are live.
Status CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit);
// Decodes a Timestamp from the provided string and updates the clock
// with it.
Status UpdateClock(uint64_t timestamp);
// Return a log prefix string in the standard "T xxx P yyy" format.
string LogPrefix() const;
// Log a status message and set the TabletReplica's status as well.
void SetStatusMessage(const string& status);
const scoped_refptr<TabletMetadata> tablet_meta_;
const RaftConfigPB committed_raft_config_;
Clock* clock_;
shared_ptr<MemTracker> mem_tracker_;
scoped_refptr<rpc::ResultTracker> result_tracker_;
MetricRegistry* metric_registry_;
FileCache* file_cache_;
scoped_refptr<TabletReplica> tablet_replica_;
unique_ptr<tablet::Tablet> tablet_;
const scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
scoped_refptr<log::Log> log_;
std::shared_ptr<log::LogReader> log_reader_;
// Statistics on the replay of entries in the log.
struct Stats {
Stats()
: ops_read(0),
ops_overwritten(0),
ops_ignored(0),
ops_committed(0),
inserts_seen(0),
inserts_ignored(0),
mutations_seen(0),
mutations_ignored(0),
orphaned_commits(0) {
}
string ToString() const {
return Substitute("ops{read=$0 overwritten=$1 applied=$2 ignored=$3} "
"inserts{seen=$4 ignored=$5} "
"mutations{seen=$6 ignored=$7} "
"orphaned_commits=$8",
ops_read, ops_overwritten, ops_committed, ops_ignored,
inserts_seen, inserts_ignored,
mutations_seen, mutations_ignored,
orphaned_commits);
}
// Number of REPLICATE messages read from the log
int ops_read;
// Number of REPLICATE messages which were overwritten by later entries.
int ops_overwritten;
// Number of REPLICATE messages which were able to be completely ignored
// because the COMMIT message indicated that all of the contained operations
// were already flushed.
int ops_ignored;
// Number of REPLICATE messages for which a matching COMMIT was found.
int ops_committed;
// Number inserts/mutations seen and ignored. Note inserts_ignored does not refer
// to the INSERT_IGNORE operation. It refers to inserts ignored during log replay.
int inserts_seen, inserts_ignored;
int mutations_seen, mutations_ignored;
// Number of COMMIT messages for which a corresponding REPLICATE was not found.
int orphaned_commits;
};
Stats stats_;
// Snapshot of which stores were flushed prior to restart.
FlushedStoresSnapshot flushed_stores_;
DISALLOW_COPY_AND_ASSIGN(TabletBootstrap);
};
void TabletBootstrap::SetStatusMessage(const string& status) {
LOG_WITH_PREFIX(INFO) << status;
if (tablet_replica_) tablet_replica_->SetStatusMessage(status);
}
Status BootstrapTablet(scoped_refptr<TabletMetadata> tablet_meta,
RaftConfigPB committed_raft_config,
Clock* clock,
shared_ptr<MemTracker> mem_tracker,
scoped_refptr<ResultTracker> result_tracker,
MetricRegistry* metric_registry,
FileCache* file_cache,
scoped_refptr<TabletReplica> tablet_replica,
scoped_refptr<log::LogAnchorRegistry> log_anchor_registry,
shared_ptr<tablet::Tablet>* rebuilt_tablet,
scoped_refptr<log::Log>* rebuilt_log,
ConsensusBootstrapInfo* consensus_info) {
TRACE_EVENT1("tablet", "BootstrapTablet",
"tablet_id", tablet_meta->tablet_id());
TabletBootstrap bootstrap(std::move(tablet_meta),
std::move(committed_raft_config),
clock,
std::move(mem_tracker),
std::move(result_tracker),
metric_registry,
file_cache,
std::move(tablet_replica),
std::move(log_anchor_registry));
RETURN_NOT_OK(bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, consensus_info));
// This is necessary since OpenNewLog() initially disables sync.
RETURN_NOT_OK((*rebuilt_log)->ReEnableSyncIfRequired());
return Status::OK();
}
static string DebugInfo(const string& tablet_id,
int segment_seqno,
int entry_idx,
const string& segment_path,
const string& entry_debug_info) {
// Truncate the debug string to a reasonable length for logging.
// Otherwise, glog will truncate for us and we may miss important
// information which came after this long string.
string debug_str = entry_debug_info;
if (debug_str.size() > 500) {
debug_str.resize(500);
debug_str.append("...");
}
if (!debug_str.empty()) {
debug_str = Substitute(" Entry: $0", debug_str);
}
return Substitute("Debug Info: Error playing entry $0 of segment $1 of tablet $2. "
"Segment path: $3.$4", entry_idx, segment_seqno, tablet_id,
segment_path, debug_str);
}
TabletBootstrap::TabletBootstrap(
scoped_refptr<TabletMetadata> tablet_meta,
RaftConfigPB committed_raft_config,
Clock* clock,
shared_ptr<MemTracker> mem_tracker,
scoped_refptr<ResultTracker> result_tracker,
MetricRegistry* metric_registry,
FileCache* file_cache,
scoped_refptr<TabletReplica> tablet_replica,
scoped_refptr<LogAnchorRegistry> log_anchor_registry)
: tablet_meta_(std::move(tablet_meta)),
committed_raft_config_(std::move(committed_raft_config)),
clock_(clock),
mem_tracker_(std::move(mem_tracker)),
result_tracker_(std::move(result_tracker)),
metric_registry_(metric_registry),
file_cache_(file_cache),
tablet_replica_(std::move(tablet_replica)),
log_anchor_registry_(std::move(log_anchor_registry)) {}
Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* consensus_info) {
// We pin (prevent) metadata flush at the beginning of the bootstrap process
// and always unpin it at the end.
tablet_meta_->PinFlush();
// Now run the actual bootstrap process.
Status bootstrap_status = RunBootstrap(rebuilt_tablet, rebuilt_log, consensus_info);
// Add a callback to TabletMetadata that makes sure that each time we flush the metadata
// we also wait for in-flights to finish and for their wal entry to be fsynced.
// This might be a bit conservative in some situations but it will prevent us from
// ever flushing the metadata referring to tablet data blocks containing data whose
// commit entries are not durable, a pre-requisite for recovery.
CHECK((*rebuilt_tablet && *rebuilt_log) || !bootstrap_status.ok())
<< "Tablet and Log not initialized";
if (bootstrap_status.ok()) {
auto cb = make_scoped_refptr(new FlushInflightsToLogCallback(
rebuilt_tablet->get(), *rebuilt_log));
tablet_meta_->SetPreFlushCallback(
[cb]() { return cb->WaitForInflightsAndFlushLog(); });
}
// This will cause any pending TabletMetadata flush to be executed.
Status unpin_status = tablet_meta_->UnPinFlush();
constexpr char kFailedUnpinMsg[] = "Failed to flush after unpinning";
if (PREDICT_FALSE(!bootstrap_status.ok() && !unpin_status.ok())) {
LOG_WITH_PREFIX(WARNING) << kFailedUnpinMsg << ": " << unpin_status.ToString();
return bootstrap_status;
}
RETURN_NOT_OK(bootstrap_status);
RETURN_NOT_OK_PREPEND(unpin_status, Substitute("$0$1", LogPrefix(), kFailedUnpinMsg));
return Status::OK();
}
Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* consensus_info) {
string tablet_id = tablet_meta_->tablet_id();
// Make sure we don't try to locally bootstrap a tablet that was in the middle
// of a tablet copy. It's likely that not all files were copied over
// successfully.
TabletDataState tablet_data_state = tablet_meta_->tablet_data_state();
if (tablet_data_state != TABLET_DATA_READY) {
return Status::Corruption("Unable to locally bootstrap tablet " + tablet_id + ": " +
"TabletMetadata bootstrap state is " +
TabletDataState_Name(tablet_data_state));
}
SetStatusMessage("Bootstrap starting.");
if (VLOG_IS_ON(1)) {
TabletSuperBlockPB super_block;
RETURN_NOT_OK(tablet_meta_->ToSuperBlock(&super_block));
VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << SecureDebugString(super_block);
}
// Ensure the tablet's data dirs are present and healthy before it is opened.
DataDirGroupPB data_dir_group;
RETURN_NOT_OK_PREPEND(
tablet_meta_->fs_manager()->dd_manager()->GetDataDirGroupPB(tablet_id, &data_dir_group),
"error retrieving tablet data dir group (one or more data dirs may have been removed)");
if (tablet_meta_->fs_manager()->dd_manager()->IsTabletInFailedDir(tablet_id)) {
return Status::IOError("some tablet data is in a failed directory");
}
RETURN_NOT_OK(flushed_stores_.InitFrom(*tablet_meta_.get()));
bool has_blocks;
RETURN_NOT_OK(OpenTablet(&has_blocks));
bool needs_recovery;
RETURN_NOT_OK(PrepareRecoveryDir(&needs_recovery));
if (needs_recovery) {
RETURN_NOT_OK(OpenLogReaderInRecoveryDir());
}
// This is a new tablet, nothing left to do.
if (!has_blocks && !needs_recovery) {
LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log.";
RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log");
RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log",
rebuilt_log, rebuilt_tablet));
consensus_info->last_id = MinimumOpId();
consensus_info->last_committed_id = MinimumOpId();
return Status::OK();
}
// If there were blocks, there must be segments to replay. This is required
// by Raft, since we always need to know the term and index of the last
// logged op in order to vote, know how to respond to AppendEntries(), etc.
if (has_blocks && !needs_recovery) {
return Status::IllegalState(Substitute("Tablet $0: Found rowsets but no log "
"segments could be found.",
tablet_id));
}
IOContext io_context({ tablet_meta_->table_id() });
RETURN_NOT_OK_PREPEND(PlaySegments(&io_context, consensus_info), "Failed log replay. Reason");
RETURN_NOT_OK(Log::RemoveRecoveryDirIfExists(tablet_->metadata()->fs_manager(),
tablet_->metadata()->tablet_id()));
RETURN_NOT_OK(FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet));
return Status::OK();
}
Status TabletBootstrap::FinishBootstrap(const string& message,
scoped_refptr<log::Log>* rebuilt_log,
shared_ptr<Tablet>* rebuilt_tablet) {
RETURN_NOT_OK(tablet_->MarkFinishedBootstrapping());
SetStatusMessage(message);
rebuilt_tablet->reset(tablet_.release());
rebuilt_log->swap(log_);
return Status::OK();
}
Status TabletBootstrap::OpenTablet(bool* has_blocks) {
unique_ptr<Tablet> tablet(new Tablet(tablet_meta_,
clock_,
mem_tracker_,
metric_registry_,
log_anchor_registry_));
// doing nothing for now except opening a tablet locally.
{
SCOPED_LOG_SLOW_EXECUTION_PREFIX(INFO, 100, LogPrefix(), "opening tablet");
RETURN_NOT_OK(tablet->Open());
}
*has_blocks = tablet->num_rowsets() != 0;
tablet_ = std::move(tablet);
return Status::OK();
}
Status TabletBootstrap::PrepareRecoveryDir(bool* needs_recovery) {
*needs_recovery = false;
FsManager* fs_manager = tablet_->metadata()->fs_manager();
string tablet_id = tablet_->metadata()->tablet_id();
string log_dir = fs_manager->GetTabletWalDir(tablet_id);
// If the recovery directory exists, then we crashed mid-recovery.
// Throw away any logs from the previous recovery attempt and restart the log
// replay process from the beginning using the same recovery dir as last time.
string recovery_path = fs_manager->GetTabletWalRecoveryDir(tablet_id);
if (fs_manager->Exists(recovery_path)) {
LOG_WITH_PREFIX(INFO) << "Previous recovery directory found at " << recovery_path << ": "
<< "Replaying log files from this location instead of " << log_dir;
// Since we have a recovery directory, clear out the log_dir by recursively
// deleting it and creating a new one so that we don't end up with remnants
// of old WAL segments or indexes after replay.
if (fs_manager->Exists(log_dir)) {
LOG_WITH_PREFIX(INFO) << "Deleting old log files from previous recovery attempt in "
<< log_dir;
RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteRecursively(log_dir),
"Could not recursively delete old log dir " + log_dir);
}
RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(fs_manager->env(), log_dir),
"Failed to create log directory " + log_dir);
*needs_recovery = true;
return Status::OK();
}
// If we made it here, there was no pre-existing recovery dir.
// Now we look for log files in log_dir, and if we find any then we rename
// the whole log_dir to a recovery dir and return needs_recovery = true.
RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(fs_manager->env(), log_dir),
"Failed to create log dir");
vector<string> children;
RETURN_NOT_OK_PREPEND(fs_manager->ListDir(log_dir, &children),
"Couldn't list log segments.");
for (const string& child : children) {
if (!log::IsLogFileName(child)) {
continue;
}
string source_path = JoinPathSegments(log_dir, child);
string dest_path = JoinPathSegments(recovery_path, child);
VLOG_WITH_PREFIX(1) << "Will attempt to recover log segment " << source_path
<< " to " << dest_path;
*needs_recovery = true;
}
if (*needs_recovery) {
// Atomically rename the log directory to the recovery directory
// and then re-create the log directory.
VLOG_WITH_PREFIX(1) << "Moving log directory " << log_dir << " to recovery directory "
<< recovery_path << " in preparation for log replay";
RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(log_dir, recovery_path),
Substitute("Could not move log directory $0 to recovery dir $1",
log_dir, recovery_path));
RETURN_NOT_OK_PREPEND(fs_manager->env()->CreateDir(log_dir),
"Failed to recreate log directory " + log_dir);
}
return Status::OK();
}
Status TabletBootstrap::OpenLogReaderInRecoveryDir() {
const string& tablet_id = tablet_->tablet_id();
FsManager* fs_manager = tablet_meta_->fs_manager();
VLOG_WITH_PREFIX(1) << "Opening log reader in log recovery dir "
<< fs_manager->GetTabletWalRecoveryDir(tablet_id);
// Open the reader.
// Since we're recovering, we don't want to have any log index -- since it
// isn't fsynced() during writing, its contents are useless to us.
scoped_refptr<LogIndex> log_index(nullptr);
const string recovery_dir = fs_manager->GetTabletWalRecoveryDir(tablet_id);
RETURN_NOT_OK_PREPEND(LogReader::Open(fs_manager->env(), recovery_dir, log_index, tablet_id,
tablet_->GetMetricEntity().get(),
file_cache_,
&log_reader_),
"Could not open LogReader. Reason");
return Status::OK();
}
Status TabletBootstrap::OpenNewLog() {
RETURN_NOT_OK(Log::Open(LogOptions(),
tablet_->metadata()->fs_manager(),
file_cache_,
tablet_->tablet_id(),
*tablet_->schema(),
tablet_->metadata()->schema_version(),
tablet_->GetMetricEntity(),
&log_));
// Disable sync temporarily in order to speed up appends during the
// bootstrap process.
log_->DisableSync();
return Status::OK();
}
typedef map<int64_t, unique_ptr<LogEntryPB>> OpIndexToEntryMap;
// State kept during replay.
struct ReplayState {
ReplayState()
: prev_op_id(MinimumOpId()),
committed_op_id(MinimumOpId()) {
}
// Return true if 'b' is allowed to immediately follow 'a' in the log.
static bool IsValidSequence(const OpId& a, const OpId& b) {
if (a.term() == 0 && a.index() == 0) {
// Not initialized - can start with any opid.
return true;
}
// Within the same term, we should never skip entries.
// We can, however go backwards (see KUDU-783 for an example)
if (b.term() == a.term() &&
b.index() > a.index() + 1) {
return false;
}
return true;
}
// Return a Corruption status if 'id' seems to be out-of-sequence in the log.
Status CheckSequentialReplicateId(const ReplicateMsg& msg) {
DCHECK(msg.has_id());
if (PREDICT_FALSE(!IsValidSequence(prev_op_id, msg.id()))) {
string op_desc = Substitute("$0 REPLICATE (Type: $1)",
OpIdToString(msg.id()),
OperationType_Name(msg.op_type()));
return Status::Corruption(
Substitute("Unexpected opid following opid $0. Operation: $1",
OpIdToString(prev_op_id),
op_desc));
}
prev_op_id = msg.id();
return Status::OK();
}
void UpdateCommittedOpId(const OpId& id) {
if (id.index() > committed_op_id.index()) {
committed_op_id = id;
}
}
void AddEntriesToStrings(const OpIndexToEntryMap& entries, vector<string>* strings) const {
for (const auto& map_entry : entries) {
const LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second.get());
strings->push_back(Substitute(" $0", SecureShortDebugString(*entry)));
}
}
void DumpReplayStateToStrings(vector<string>* strings) const {
strings->push_back(Substitute("ReplayState: Previous OpId: $0, Committed OpId: $1, "
"Pending Replicates: $2, Pending Commits: $3", OpIdToString(prev_op_id),
OpIdToString(committed_op_id), pending_replicates.size(), pending_commits.size()));
if (!pending_replicates.empty()) {
strings->push_back("Dumping REPLICATES: ");
AddEntriesToStrings(pending_replicates, strings);
}
if (!pending_commits.empty()) {
strings->push_back("Dumping COMMITS: ");
AddEntriesToStrings(pending_commits, strings);
}
}
// The last replicate message's ID.
OpId prev_op_id;
// The last operation known to be committed.
// All other operations with lower IDs are also committed.
OpId committed_op_id;
// REPLICATE log entries whose corresponding COMMIT record has
// not yet been seen. Keyed by index.
OpIndexToEntryMap pending_replicates;
// COMMIT log entries which couldn't be applied immediately.
OpIndexToEntryMap pending_commits;
};
// Handle the given log entry.
Status TabletBootstrap::HandleEntry(const IOContext* io_context,
ReplayState* state,
unique_ptr<LogEntryPB> entry,
string* entry_debug_info) {
DCHECK(entry);
VLOG_WITH_PREFIX(1) << "Handling entry: " << SecureShortDebugString(*entry);
const auto entry_type = entry->type();
switch (entry_type) {
case log::REPLICATE:
RETURN_NOT_OK(HandleReplicateMessage(state, std::move(entry), entry_debug_info));
break;
case log::COMMIT:
// check the unpaired ops for the matching replicate msg, abort if not found
RETURN_NOT_OK(HandleCommitMessage(io_context, state,
std::move(entry), entry_debug_info));
break;
default:
return Status::Corruption(Substitute("unexpected log entry type: $0", entry_type));
}
MAYBE_FAULT(FLAGS_fault_crash_during_log_replay);
return Status::OK();
}
// Repair overflow issue reported in KUDU-1933.
void CheckAndRepairOpIdOverflow(OpId* opid) {
if (PREDICT_FALSE(opid->term() < consensus::kMinimumTerm)) {
int64_t overflow = opid->term() - INT32_MIN + 1LL;
CHECK_GE(overflow, 1) << OpIdToString(*opid);
opid->set_term(static_cast<int64_t>(INT32_MAX) + overflow);
}
if (PREDICT_FALSE(opid->index() < consensus::kMinimumOpIdIndex &&
opid->index() != consensus::kInvalidOpIdIndex)) {
int64_t overflow = opid->index() - INT32_MIN + 1LL;
CHECK_GE(overflow, 1) << OpIdToString(*opid);
// Sanity check. Even with the bug in KUDU-1933, the number of bytes
// allowed in a single group commit is a generous upper bound on how far a
// log index may have overflowed before causing a crash.
CHECK_LT(overflow, FLAGS_group_commit_queue_size_bytes) << OpIdToString(*opid);
opid->set_index(static_cast<int64_t>(INT32_MAX) + overflow);
}
}
Status TabletBootstrap::HandleReplicateMessage(ReplayState* state,
unique_ptr<LogEntryPB> entry,
string* entry_debug_info) {
auto info_collector = MakeScopedCleanup([&]() {
if (entry) {
*entry_debug_info = SecureShortDebugString(*entry);
}
});
DCHECK(entry->has_replicate())
<< "not a replicate message: " << SecureDebugString(*entry);
stats_.ops_read++;
// Fix overflow if necessary (see KUDU-1933).
CheckAndRepairOpIdOverflow(entry->mutable_replicate()->mutable_id());
const ReplicateMsg& replicate = entry->replicate();
RETURN_NOT_OK(state->CheckSequentialReplicateId(replicate));
DCHECK(replicate.has_timestamp());
CHECK_OK(UpdateClock(replicate.timestamp()));
// Append the replicate message to the log as is
RETURN_NOT_OK(log_->Append(entry.get()));
const int64_t index = replicate.id().index();
const auto existing_entry_iter = state->pending_replicates.find(index);
if (existing_entry_iter != state->pending_replicates.end()) {
// If there was a entry with the same index we're overwriting then we need
// to delete that entry and all entries with higher indexes.
const auto& existing_entry = existing_entry_iter->second;
auto iter = state->pending_replicates.lower_bound(index);
DCHECK(OpIdEquals(iter->second->replicate().id(), existing_entry->replicate().id()));
const auto& last_entry = state->pending_replicates.rbegin()->second;
VLOG_WITH_PREFIX(1) << "Overwriting operations starting at: "
<< existing_entry->replicate().id()
<< " up to: " << last_entry->replicate().id()
<< " with operation: " << replicate.id();
while (iter != state->pending_replicates.end()) {
iter = state->pending_replicates.erase(iter);
stats_.ops_overwritten++;
}
}
EmplaceOrDie(&state->pending_replicates, index, std::move(entry));
info_collector.cancel();
return Status::OK();
}
// On returning OK, takes ownership of the pointer from the 'entry_ptr' wrapper.
Status TabletBootstrap::HandleCommitMessage(const IOContext* io_context, ReplayState* state,
unique_ptr<LogEntryPB> entry,
string* entry_debug_info) {
auto info_collector = MakeScopedCleanup([&]() {
if (entry) {
*entry_debug_info = SecureShortDebugString(*entry);
}
});
DCHECK(entry->has_commit())
<< "not a commit message: " << SecureDebugString(*entry);
// Fix overflow if necessary (see KUDU-1933).
CheckAndRepairOpIdOverflow(entry->mutable_commit()->mutable_commited_op_id());
// Match up the COMMIT record with the original entry that it's applied to.
const OpId& committed_op_id = entry->commit().commited_op_id();
state->UpdateCommittedOpId(committed_op_id);
// If there are no pending replicates, or if this commit's index is lower than the
// the first pending replicate on record this is likely an orphaned commit.
if (state->pending_replicates.empty() ||
(*state->pending_replicates.begin()).first > committed_op_id.index()) {
VLOG_WITH_PREFIX(2) << "Found orphaned commit for " << committed_op_id;
RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit()));
stats_.orphaned_commits++;
info_collector.cancel();
return Status::OK();
}
// If this commit does not correspond to the first replicate message in the pending
// replicates set we keep it to apply later...
if ((*state->pending_replicates.begin()).first != committed_op_id.index()) {
if (!ContainsKey(state->pending_replicates, committed_op_id.index())) {
return Status::Corruption(Substitute("Could not find replicate for commit: $0",
SecureShortDebugString(*entry)));
}
VLOG_WITH_PREFIX(2) << "Adding pending commit for " << committed_op_id;
EmplaceOrDie(&state->pending_commits, committed_op_id.index(), std::move(entry));
info_collector.cancel();
return Status::OK();
}
// ... if it does, we apply it and all the commits that immediately follow in the sequence.
OpId last_applied = committed_op_id;
RETURN_NOT_OK(ApplyCommitMessage(io_context, state, entry.get()));
auto iter = state->pending_commits.begin();
while (iter != state->pending_commits.end()) {
if (iter->first == last_applied.index() + 1) {
auto& buffered_commit_entry(iter->second);
last_applied = buffered_commit_entry->commit().commited_op_id();
RETURN_NOT_OK(ApplyCommitMessage(io_context, state, buffered_commit_entry.get()));
iter = state->pending_commits.erase(iter);
continue;
}
break;
}
info_collector.cancel();
return Status::OK();
}
TabletBootstrap::ActiveStores TabletBootstrap::AnalyzeActiveStores(const CommitMsg& commit) {
bool has_mutated_stores = false;
bool has_active_stores = false;
for (const OperationResultPB& op_result : commit.result().ops()) {
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
has_mutated_stores = true;
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
has_active_stores = true;
}
}
}
if (!has_mutated_stores) {
return NO_MUTATED_STORES;
}
return has_active_stores ? SOME_STORES_ACTIVE : NO_STORES_ACTIVE;
}
Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit) {
if (AnalyzeActiveStores(commit) == SOME_STORES_ACTIVE) {
TabletSuperBlockPB super;
WARN_NOT_OK(tablet_meta_->ToSuperBlock(&super),
Substitute("$0$1", LogPrefix(), "Couldn't build TabletSuperBlockPB"));
return Status::Corruption(Substitute("CommitMsg was orphaned but it referred to "
"stores which need replay. Commit: $0. TabletMetadata: $1",
SecureShortDebugString(commit),
SecureShortDebugString(super)));
}
return Status::OK();
}
Status TabletBootstrap::ApplyCommitMessage(const IOContext* io_context,
ReplayState* state, LogEntryPB* entry) {
const OpId& committed_op_id = entry->commit().commited_op_id();
VLOG_WITH_PREFIX(2) << "Applying commit for " << committed_op_id;
// They should also have an associated replicate index (it may have been in a
// deleted log segment though).
unique_ptr<LogEntryPB> pending_replicate_entry(EraseKeyReturnValuePtr(
&state->pending_replicates, committed_op_id.index()));
if (pending_replicate_entry) {
// We found a replicate with the same index, make sure it also has the same
// term.
const auto& replicate = pending_replicate_entry->replicate();
if (!OpIdEquals(committed_op_id, replicate.id())) {
string error_msg = Substitute("Committed operation's OpId: $0 didn't match the"
"commit message's committed OpId: $1. Pending operation: $2, Commit message: $3",
SecureShortDebugString(replicate.id()),
SecureShortDebugString(committed_op_id),
SecureShortDebugString(replicate),
SecureShortDebugString(entry->commit()));
LOG_WITH_PREFIX(DFATAL) << error_msg;
return Status::Corruption(error_msg);
}
RETURN_NOT_OK(HandleEntryPair(io_context, pending_replicate_entry.get(), entry));
stats_.ops_committed++;
} else {
stats_.orphaned_commits++;
RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit()));
}
return Status::OK();
}
// Never deletes 'replicate_entry' or 'commit_entry'.
Status TabletBootstrap::HandleEntryPair(const IOContext* io_context, LogEntryPB* replicate_entry,
LogEntryPB* commit_entry) {
const char* error_fmt = "Failed to play $0 request. ReplicateMsg: { $1 }, CommitMsg: { $2 }";
#define RETURN_NOT_OK_REPLAY(ReplayMethodName, io_context, replicate, commit) \
RETURN_NOT_OK_PREPEND(ReplayMethodName(io_context, replicate, commit), \
Substitute(error_fmt, OperationType_Name(op_type), \
SecureShortDebugString(*(replicate)), \
SecureShortDebugString(commit)))
ReplicateMsg* replicate = replicate_entry->mutable_replicate();
const CommitMsg& commit = commit_entry->commit();
OperationType op_type = commit.op_type();
switch (op_type) {
case WRITE_OP:
RETURN_NOT_OK_REPLAY(PlayWriteRequest, io_context, replicate, commit);
break;
case ALTER_SCHEMA_OP:
RETURN_NOT_OK_REPLAY(PlayAlterSchemaRequest, io_context, replicate, commit);
break;
case CHANGE_CONFIG_OP:
RETURN_NOT_OK_REPLAY(PlayChangeConfigRequest, io_context, replicate, commit);
break;
case PARTICIPANT_OP:
RETURN_NOT_OK_REPLAY(PlayTxnParticipantOpRequest, io_context, replicate, commit);
break;
case NO_OP:
RETURN_NOT_OK_REPLAY(PlayNoOpRequest, io_context, replicate, commit);
break;
default:
return Status::IllegalState(Substitute("Unsupported commit entry type: $0",
commit.op_type()));
}
#undef RETURN_NOT_OK_REPLAY
// We should only advance MVCC's safe time based on a specific set of
// operations: those whose timestamps are guaranteed to be monotonically
// increasing with respect to their entries in the write-ahead log.
bool timestamp_assigned_in_opid_order = true;
switch (op_type) {
case CHANGE_CONFIG_OP:
timestamp_assigned_in_opid_order = false;
break;
case NO_OP: {
const auto& req = replicate->noop_request();
if (req.has_timestamp_in_opid_order()) {
timestamp_assigned_in_opid_order = req.timestamp_in_opid_order();
}
break;
}
default:
break;
}
if (!timestamp_assigned_in_opid_order) {
return Status::OK();
}
// Handle advancement of our new timestamp lower bound watermark.
//
// If this message is a Raft election no-op, or is an op that has an external
// consistency mode other than COMMIT_WAIT, we know that no future op will
// have a timestamp that is lower than it, so we can just advance the safe
// timestamp to the message's timestamp.
//
// If the hybrid clock is disabled, all ops will fall into this category.
Timestamp new_lower_bound;
if (replicate->op_type() != consensus::WRITE_OP ||
replicate->write_request().external_consistency_mode() != COMMIT_WAIT) {
new_lower_bound = Timestamp(replicate->timestamp());
// ... else we set the new timestamp lower bound to be the op's
// timestamp minus the maximum clock error. This opens the door for problems
// if the flags changed across reboots, but this is unlikely and the problem
// would manifest itself immediately and clearly (mvcc would complain the
// operation is already committed, with a CHECK failure).
} else {
DCHECK(clock_->SupportsExternalConsistencyMode(COMMIT_WAIT)) << "The provided clock does not"
"support COMMIT_WAIT external consistency mode.";
new_lower_bound = clock::HybridClock::AddPhysicalTimeToTimestamp(
Timestamp(replicate->timestamp()),
MonoDelta::FromMicroseconds(-FLAGS_max_clock_sync_error_usec));
}
tablet_->mvcc_manager()->AdjustNewOpLowerBound(new_lower_bound);
return Status::OK();
}
void TabletBootstrap::DumpReplayStateToLog(const ReplayState& state) {
// Dump the replay state, this will log the pending replicates as well as the pending commits,
// which might be useful for debugging.
vector<string> state_dump;
state.DumpReplayStateToStrings(&state_dump);
for (const string& string : state_dump) {
LOG_WITH_PREFIX(INFO) << string;
}
}
Status TabletBootstrap::PlaySegments(const IOContext* io_context,
ConsensusBootstrapInfo* consensus_info) {
ReplayState state;
log::SegmentSequence segments;
log_reader_->GetSegmentsSnapshot(&segments);
// The first thing to do is to rewind the tablet's schema back to the schema
// as of the point in time where the logs begin. We must replay the writes
// in the logs with the correct point-in-time schema.
if (!segments.empty()) {
const scoped_refptr<ReadableLogSegment>& segment = segments[0];
// Set the point-in-time schema for the tablet based on the log header.
Schema pit_schema;
RETURN_NOT_OK_PREPEND(SchemaFromPB(segment->header().schema(), &pit_schema),
"Couldn't decode log segment schema");
RETURN_NOT_OK_PREPEND(tablet_->RewindSchemaForBootstrap(
pit_schema, segment->header().schema_version()),
"couldn't set point-in-time schema");
}
// We defer opening the log until here, so that we properly reproduce the
// point-in-time schema from the log we're reading into the log we're
// writing.
RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log");
auto last_status_update = MonoTime::Now();
const auto kStatusUpdateInterval = MonoDelta::FromSeconds(5);
int segment_count = 0;
for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
log::LogEntryReader reader(segment.get());
int entry_count = 0;
while (true) {
{
unique_ptr<LogEntryPB> entry;
Status s = reader.ReadNextEntry(&entry);
if (PREDICT_FALSE(!s.ok())) {
if (s.IsEndOfFile()) {
break;
}
return Status::Corruption(
Substitute("Error reading Log Segment of tablet $0: $1 "
"(Read up to entry $2 of segment $3, in path $4)",
tablet_->tablet_id(),
s.ToString(),
entry_count,
segment->header().sequence_number(),
segment->path()));
}
entry_count++;
string entry_debug_info;
s = HandleEntry(io_context, &state, std::move(entry), &entry_debug_info);
if (!s.ok()) {
DumpReplayStateToLog(state);
RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(),
segment->header().sequence_number(),
entry_count, segment->path(),
entry_debug_info));
}
}
const auto now = MonoTime::Now();
if (now - last_status_update > kStatusUpdateInterval) {
SetStatusMessage(Substitute("Bootstrap replaying log segment $0/$1 "
"($2/$3 this segment, stats: $4)",
segment_count + 1, log_reader_->num_segments(),
HumanReadableNumBytes::ToString(reader.offset()),
HumanReadableNumBytes::ToString(reader.read_up_to_offset()),
stats_.ToString()));
last_status_update = now;
}
}
SetStatusMessage(Substitute("Bootstrap replayed $0/$1 log segments. "
"Stats: $2. Pending: $3 replicates",
segment_count + 1, log_reader_->num_segments(),
stats_.ToString(),
state.pending_replicates.size()));
segment_count++;
}
// If we have non-applied commits they all must belong to pending operations and
// they should only pertain to stores which are still active.
if (!state.pending_commits.empty()) {
for (const OpIndexToEntryMap::value_type& entry : state.pending_commits) {
if (!ContainsKey(state.pending_replicates, entry.first)) {
DumpReplayStateToLog(state);
return Status::Corruption("Had orphaned commits at the end of replay.");
}
if (entry.second->commit().op_type() == WRITE_OP &&
AnalyzeActiveStores(entry.second->commit()) == NO_STORES_ACTIVE) {
DumpReplayStateToLog(state);
TabletSuperBlockPB super;
WARN_NOT_OK(tablet_meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");
return Status::Corruption(Substitute("CommitMsg was pending but it did not refer "
"to any active memory stores. Commit: $0. TabletMetadata: $1",
SecureShortDebugString(entry.second->commit()), SecureShortDebugString(super)));
}
}
}
// Note that we don't pass the information contained in the pending commits along with
// ConsensusBootstrapInfo. We know that this is safe as they must refer to active
// stores (we make doubly sure above).
//
// Example/Explanation:
// Say we have two different operations that touch the same row, one insert and one
// mutate. Since we use Early Lock Release the commit for the second (mutate) operation
// might end up in the log before the insert's commit. This wouldn't matter since
// we replay in order, but a corner case here is that we might crash before we
// write the commit for the insert, meaning it might not be present at all.
//
// One possible log for this situation would be:
// - Replicate 10.10 (insert)
// - Replicate 10.11 (mutate)
// - Commit 10.11 (mutate)
// ~CRASH while Commit 10.10 is in-flight~
//
// We can't replay 10.10 during bootstrap because we haven't seen its commit, but
// since we can't replay out-of-order we won't replay 10.11 either, in fact we'll
// pass them both as "pending" to consensus to be applied again.
//
// The reason why it is safe to simply disregard 10.11's commit is that we know that
// it must refer only to active stores. We know this because one important flush/compact
// pre-condition is:
// - No flush will become visible on reboot (meaning we won't durably update the tablet
// metadata), unless the snapshot under which the flush/compact was performed has no
// in-flight ops and all the messages that are in-flight to the log are durable.
//
// In our example this means that if we had flushed/compacted after 10.10 was applied
// (meaning losing the commit message would lead to corruption as we might re-apply it)
// then the commit for 10.10 would be durable. Since it isn't then no flush/compaction
// occurred after 10.10 was applied and thus we can disregard the commit message for
// 10.11 and simply apply both 10.10 and 10.11 as if we hadn't applied them before.
//
// This generalizes to:
// - If a committed replicate message with index Y is missing a commit message,
// no later committed replicate message (with index > Y) is visible across reboots
// in the tablet data.
if (VLOG_IS_ON(1)) {
DumpReplayStateToLog(state);
}
// Set up the ConsensusBootstrapInfo structure for the caller.
for (OpIndexToEntryMap::value_type& e : state.pending_replicates) {
consensus_info->orphaned_replicates.push_back(e.second->release_replicate());
}
consensus_info->last_id = state.prev_op_id;
consensus_info->last_committed_id = state.committed_op_id;
return Status::OK();
}
Status TabletBootstrap::AppendCommitMsg(const CommitMsg& commit_msg) {
LogEntryPB commit_entry;
commit_entry.set_type(log::COMMIT);
CommitMsg* commit = commit_entry.mutable_commit();
commit->CopyFrom(commit_msg);
return log_->Append(&commit_entry);
}
Status TabletBootstrap::DetermineSkippedOpsAndBuildResponse(const TxResultPB& orig_result,
TxResultPB* new_result,
WriteResponsePB* response,
bool* all_skipped) {
int num_ops = orig_result.ops_size();
new_result->mutable_ops()->Reserve(num_ops);
*all_skipped = true;
for (int i = 0; i < num_ops; i++) {
const auto& orig_op_result = orig_result.ops(i);
OpAction action;
RETURN_NOT_OK(FilterOperation(orig_op_result, &action));
*all_skipped &= action != NEEDS_REPLAY;
if (action != NEEDS_REPLAY) {
new_result->mutable_ops(i)->set_skip_on_replay(true);
}
if (action == SKIP_PREVIOUSLY_FAILED) {
if (response) {
WriteResponsePB::PerRowErrorPB* error = response->add_per_row_errors();
error->set_row_index(i);
error->mutable_error()->CopyFrom(orig_op_result.failed_status());
}
// If the op is already flushed we won't be applying it.
DCHECK(orig_op_result.has_failed_status());
new_result->mutable_ops(i)->mutable_failed_status()->CopyFrom(orig_op_result.failed_status());
}
}
if (*all_skipped) {
stats_.ops_ignored++;
}
return Status::OK();
}
Status TabletBootstrap::PlayWriteRequest(const IOContext* io_context,
ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
// Set up the new op.
// Even if we're going to ignore the op, it's important to do this so that
// MVCC advances.
DCHECK(replicate_msg->has_timestamp());
WriteRequestPB* write = replicate_msg->mutable_write_request();
WriteOpState op_state(nullptr, write, nullptr);
op_state.mutable_op_id()->CopyFrom(replicate_msg->id());
op_state.set_timestamp(Timestamp(replicate_msg->timestamp()));
// Prepare the commit entry for the rewritten log.
LogEntryPB& commit_entry = *google::protobuf::Arena::CreateMessage<LogEntryPB>(
op_state.pb_arena());
commit_entry.set_type(log::COMMIT);
CommitMsg* new_commit = commit_entry.mutable_commit();
new_commit->CopyFrom(commit_msg);
tablet_->StartOp(&op_state);
tablet_->StartApplying(&op_state);
unique_ptr<WriteResponsePB> response;
bool tracking_results = result_tracker_.get() != nullptr && replicate_msg->has_request_id();
// If the results are being tracked and this write has a request id, register
// it with the result tracker.
ResultTracker::RpcState state = ResultTracker::RpcState::NEW;
if (tracking_results) {
VLOG(1) << result_tracker_.get() << " Boostrapping request for tablet: "
<< write->tablet_id() << ". State: " << 0 << " id: "
<< SecureDebugString(replicate_msg->request_id());
// We only replay committed requests so the result of tracking this request can be:
// NEW:
// This is a previously untracked request, or we changed the driver -> store the result
// COMPLETED or STALE:
// We've bootstrapped this tablet twice, and previously stored the result -> do
// nothing.
state = result_tracker_->TrackRpcOrChangeDriver(replicate_msg->request_id());
CHECK(state == ResultTracker::RpcState::NEW ||
state == ResultTracker::RpcState::COMPLETED ||
state == ResultTracker::RpcState::STALE)
<< "Wrong state: " << state;
response.reset(new WriteResponsePB());
response->set_timestamp(replicate_msg->timestamp());
}
// Determine which of the operations are already flushed to persistent
// storage and don't need to be re-applied. We can do this even before
// we decode any row operations, so we can short-circuit that decoding
// in the case that the entire op has been already flushed.
TxResultPB* new_result = new_commit->mutable_result();
bool all_flushed;
RETURN_NOT_OK(DetermineSkippedOpsAndBuildResponse(commit_msg.result(),
new_result,
response.get(),
&all_flushed));
if (tracking_results && state == ResultTracker::NEW) {
result_tracker_->RecordCompletionAndRespond(replicate_msg->request_id(), response.get());
}
Status play_status;
if (!all_flushed && write->has_row_operations()) {
// Rather than RETURN_NOT_OK() here, we need to just save the status and do the
// RETURN_NOT_OK() down below the FinishApplying() call below. Even though it seems wrong
// to commit the op when in fact it failed to apply, we would throw a CHECK
// failure if we attempted to 'Abort()' after entering the applying stage. Allowing it to
// Commit isn't problematic because we don't expose the results anyway, and the bad
// Status returned below will cause us to fail the entire tablet bootstrap anyway.
play_status = PlayRowOperations(io_context, &op_state, commit_msg.result(), new_result);
if (play_status.ok()) {
// Replace the original commit message's result with the new one from the replayed operation.
op_state.ReleaseTxResultPB(new_commit->mutable_result());
}
}
op_state.FinishApplyingOrAbort(Op::APPLIED);
// If we failed to apply the operations, fail bootstrap before we write anything incorrect
// to the recovery log.
RETURN_NOT_OK(play_status);
RETURN_NOT_OK(log_->Append(&commit_entry));
return Status::OK();
}
Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
// There are three potential outcomes to expect with this replay:
// 1. There is no 'result' in the commit message. The alter succeeds, and the
// log updates its schema.
// 2. There is no 'result' in the commit message. The alter fails, and the
// log doesn't update its schema. This can happen if trying to replay an
// invalid alter schema request from before we started putting the results
// in the commit message. Note that we'll leave the commit message as is;
// it's harmless since replaying the operation should be a no-op anyway.
// 3. The commit message contains a 'result', which should only happen if the
// alter resulted in a failure. Exit out without attempting the alter.
if (commit_msg.has_result()) {
// If we put a result in the commit message, it should be an error and we
// don't need to replay it. In case, in the future, we decide to put
// positive results in the commit messages, just filter ops that have
// failed statuses instead of D/CHECKing.
DCHECK_EQ(1, commit_msg.result().ops_size());
const OperationResultPB& op = commit_msg.result().ops(0);
if (op.has_failed_status()) {
Status error = StatusFromPB(op.failed_status());
VLOG(1) << "Played a failed alter request: " << error.ToString();
return AppendCommitMsg(commit_msg);
}
}
AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request();
// Decode schema
Schema schema;
RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema));
AlterSchemaOpState op_state(nullptr, alter_schema, nullptr);
RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&op_state, &schema));
// Apply the alter schema to the tablet
RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&op_state), "Failed to AlterSchema:");
if (!op_state.error()) {
// If the alter completed successfully, update the log segment header. Note
// that our new log isn't hooked up to the tablet yet.
log_->SetSchemaForNextLogSegment(std::move(schema), op_state.schema_version());
}
return AppendCommitMsg(commit_msg);
}
Status TabletBootstrap::PlayChangeConfigRequest(const IOContext* /*io_context*/,
ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
// Invariant: The committed config change request is always locally persisted
// in the consensus metadata before the commit message is written to the WAL.
if (PREDICT_FALSE(replicate_msg->id().index() > committed_raft_config_.opid_index())) {
string msg = Substitute("Committed config change op in WAL has opid index ($0) greater than "
"config persisted in the consensus metadata ($1). "
"Replicate message: {$2}. "
"Committed raft config in consensus metadata: {$3}",
replicate_msg->id().index(),
committed_raft_config_.opid_index(),
SecureShortDebugString(*replicate_msg),
SecureShortDebugString(committed_raft_config_));
LOG_WITH_PREFIX(DFATAL) << msg;
return Status::Corruption(msg);
}
return AppendCommitMsg(commit_msg);
}
Status TabletBootstrap::PlayTxnParticipantOpRequest(const IOContext* /*io_context*/,
ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg) {
ParticipantOpState op_state(tablet_replica_.get(),
tablet_->txn_participant(),
&replicate_msg->participant_request());
op_state.AcquireTxnAndLock();
SCOPED_CLEANUP({
op_state.ReleaseTxn();
});
// NOTE: don't bother validating the current state of the op. Presumably that
// happened the first time this op was written.
RETURN_NOT_OK(op_state.PerformOp(replicate_msg->id()));
return AppendCommitMsg(commit_msg);
}
Status TabletBootstrap::PlayNoOpRequest(const IOContext* /*io_context*/,
ReplicateMsg* /*replicate_msg*/,
const CommitMsg& commit_msg) {
return AppendCommitMsg(commit_msg);
}
Status TabletBootstrap::PlayRowOperations(const IOContext* io_context,
WriteOpState* op_state,
const TxResultPB& orig_result,
TxResultPB* new_result) {
Schema inserts_schema;
RETURN_NOT_OK_PREPEND(SchemaFromPB(op_state->request()->schema(), &inserts_schema),
"Couldn't decode client schema");
RETURN_NOT_OK_PREPEND(tablet_->DecodeWriteOperations(&inserts_schema, op_state),
Substitute("Could not decode row operations: $0",
SecureDebugString(op_state->request()->row_operations())));
// Run AcquireRowLocks, Apply, etc!
RETURN_NOT_OK_PREPEND(tablet_->AcquireRowLocks(op_state),
"Failed to acquire row locks");
RETURN_NOT_OK(ApplyOperations(io_context, op_state, orig_result, new_result));
return Status::OK();
}
Status TabletBootstrap::ApplyOperations(const IOContext* io_context,
WriteOpState* op_state,
const TxResultPB& orig_result,
TxResultPB* new_result) {
DCHECK_EQ(op_state->row_ops().size(), orig_result.ops_size());
DCHECK_EQ(op_state->row_ops().size(), new_result->ops_size());
int32_t op_idx = 0;
for (RowOp* op : op_state->row_ops()) {
int32_t curr_op_idx = op_idx++;
// Increment the seen/ignored stats.
switch (op->decoded_op.type) {
case RowOperationsPB::INSERT:
case RowOperationsPB::INSERT_IGNORE:
case RowOperationsPB::UPSERT: {
// TODO(unknown): should we have a separate counter for upserts?
stats_.inserts_seen++;
if (op->has_result()) {
stats_.inserts_ignored++;
}
break;
}
case RowOperationsPB::UPDATE:
case RowOperationsPB::DELETE: {
stats_.mutations_seen++;
if (op->has_result()) {
stats_.mutations_ignored++;
}
break;
}
default:
LOG_WITH_PREFIX(FATAL) << "Bad op type: " << op->decoded_op.type;
break;
}
const OperationResultPB& new_op_result = new_result->ops(curr_op_idx);
// If the op is already flushed or had previously failed, no need to replay it.
// TODO(dralves) this back and forth is weird. We're first setting the flushed/failed
// status on the rewritten message's commit entry. Then we pass it here to
// set the status on the op, then we set it back on the commit entry with
// ReleaseTxResultPB(). This could be simplified if we build the RowOps on
// demand and just created DecodedRowOperation/RowOp for the replayed stuff.
if (new_op_result.skip_on_replay()) {
op->SetSkippedResult(new_op_result);
continue;
}
op->set_original_result_from_log(&orig_result.ops(curr_op_idx));
// Actually apply it.
ProbeStats stats; // we don't use this, but tablet internals require non-NULL.
RETURN_NOT_OK(tablet_->ApplyRowOperation(io_context, op_state, op, &stats));
DCHECK(op->has_result());
// We expect that the above Apply() will always succeed, because we're
// applying an operation that we know succeeded before the server
// restarted. If it doesn't succeed, something is wrong and we are
// diverging from our prior state, so bail.
if (op->result->has_failed_status()) {
return Status::Corruption("Operation which previously succeeded failed "
"during log replay",
Substitute("Op: $0\nFailure: $1",
op->ToString(*tablet_->schema()),
SecureShortDebugString(op->result->failed_status())));
}
}
return Status::OK();
}
Status TabletBootstrap::FilterOperation(const OperationResultPB& op_result,
OpAction* action) {
// If the operation failed or was skipped, originally, no need to re-apply it.
if (op_result.has_failed_status()) {
*action = SKIP_PREVIOUSLY_FAILED;
return Status::OK();
}
if (op_result.skip_on_replay()) {
*action = SKIP_PREVIOUSLY_FLUSHED;
return Status::OK();
}
int num_mutated_stores = op_result.mutated_stores_size();
if (PREDICT_FALSE(num_mutated_stores > 2)) {
return Status::Corruption(Substitute("All operations must have at most two mutated_stores: $0",
SecureShortDebugString(op_result)));
}
// NOTE: it's possible that num_mutated_stores = 0 in the case of an
// UPSERT which only specified the primary key. In that case, if the
// row already existed, it gets dropped without converting into an UPDATE.
// The mutation may have been duplicated, so we'll check whether any of the
// output targets was active.
int num_active_stores = 0;
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
num_active_stores++;
}
}
if (num_active_stores == 0) {
// The mutation was fully flushed.
*action = SKIP_PREVIOUSLY_FLUSHED;
return Status::OK();
}
if (PREDICT_FALSE(num_active_stores == 2)) {
// It's not possible for a duplicated mutation to refer to two stores which are still
// active. Either the mutation arrived before the metadata was flushed, in which case
// the 'first' store is live, or it arrived just after it was flushed, in which case
// the 'second' store was live. But at no time should the metadata refer to both the
// 'input' and 'output' stores of a compaction.
return Status::Corruption("Mutation was duplicated to two stores that are considered live",
SecureShortDebugString(op_result));
}
*action = NEEDS_REPLAY;
return Status::OK();
}
Status TabletBootstrap::UpdateClock(uint64_t timestamp) {
return clock_->Update(Timestamp(timestamp));
}
string TabletBootstrap::LogPrefix() const {
return Substitute("T $0 P $1: ", tablet_meta_->tablet_id(), tablet_meta_->fs_manager()->uuid());
}
Status FlushedStoresSnapshot::InitFrom(const TabletMetadata& tablet_meta) {
CHECK(flushed_dms_by_drs_id_.empty()) << "already initted";
last_durable_mrs_id_ = tablet_meta.last_durable_mrs_id();
for (const shared_ptr<RowSetMetadata>& rsmd : tablet_meta.rowsets()) {
if (!InsertIfNotPresent(&flushed_dms_by_drs_id_, rsmd->id(),
rsmd->last_durable_redo_dms_id())) {
return Status::Corruption(Substitute(
"Duplicate DRS ID $0 in tablet metadata. "
"Found DRS $0 with last durable redo DMS ID $1 while trying to "
"initialize DRS $0 with last durable redo DMS ID $2",
rsmd->id(),
flushed_dms_by_drs_id_[rsmd->id()],
rsmd->last_durable_redo_dms_id()));
}
}
return Status::OK();
}
bool FlushedStoresSnapshot::IsMemStoreActive(const MemStoreTargetPB& target) const {
if (target.has_mrs_id()) {
DCHECK(!target.has_rs_id());
DCHECK(!target.has_dms_id());
// The original mutation went to the MRS. If this MRS has not yet been made
// durable, it needs to be replayed.
return target.mrs_id() > last_durable_mrs_id_;
} else {
// The original mutation went to a DRS's delta store.
DCHECK(target.has_rs_id());
int64_t last_durable_dms_id;
if (!FindCopy(flushed_dms_by_drs_id_, target.rs_id(), &last_durable_dms_id)) {
// If we have no data about this DRS, then there are two cases:
//
// 1) The DRS has already been flushed, but then later got removed because
// it got compacted away or culled because it was empty. In the former
// case, the deltas should have been reflected in the new compaction
// output, and in the latter, all the rows in the rowset have been
// deleted and there's nothing to replay.
//
// 2) The DRS was in the process of being written, but haven't yet
// flushed the TabletMetadata update that includes it. We only write
// to an in-progress DRS like this when we are in the 'duplicating'
// phase of a compaction. In that case, the other duplicated 'target'
// should still be present in the metadata, and we can base our
// decision based on that one.
return false;
}
// If the original rowset that we applied the edit to exists, check whether
// the edit was in a flushed DMS or a live one.
return target.dms_id() > last_durable_dms_id;
}
}
} // namespace tablet
} // namespace kudu