| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tablet/tablet_bootstrap.h" |
| |
| #include <cstdint> |
| #include <functional> |
| #include <iterator> |
| #include <map> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/optional/optional.hpp> |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/arena.h> |
| |
| #include "kudu/clock/clock.h" |
| #include "kudu/clock/hybrid_clock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/row_operations.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/log.pb.h" |
| #include "kudu/consensus/log_anchor_registry.h" |
| #include "kudu/consensus/log_index.h" |
| #include "kudu/consensus/log_reader.h" |
| #include "kudu/consensus/log_util.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/fs/data_dirs.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/fs/io_context.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/human_readable.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/result_tracker.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/mvcc.h" |
| #include "kudu/tablet/ops/alter_schema_op.h" |
| #include "kudu/tablet/ops/op.h" |
| #include "kudu/tablet/ops/participant_op.h" |
| #include "kudu/tablet/ops/write_op.h" |
| #include "kudu/tablet/row_op.h" |
| #include "kudu/tablet/rowset.h" |
| #include "kudu/tablet/rowset_metadata.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet.pb.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/env_util.h" |
| #include "kudu/util/fault_injection.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/stopwatch.h" |
| |
| DECLARE_int32(group_commit_queue_size_bytes); |
| |
| DEFINE_double(fault_crash_during_log_replay, 0.0, |
| "Fraction of the time when the tablet will crash immediately " |
| "after processing a log entry during log replay. " |
| "(For testing only!)"); |
| TAG_FLAG(fault_crash_during_log_replay, unsafe); |
| |
| DECLARE_int32(max_clock_sync_error_usec); |
| |
| using kudu::clock::Clock; |
| using kudu::consensus::ALTER_SCHEMA_OP; |
| using kudu::consensus::CHANGE_CONFIG_OP; |
| using kudu::consensus::CommitMsg; |
| using kudu::consensus::ConsensusBootstrapInfo; |
| using kudu::consensus::MinimumOpId; |
| using kudu::consensus::NO_OP; |
| using kudu::consensus::OpId; |
| using kudu::consensus::OpIdEquals; |
| using kudu::consensus::OpIdToString; |
| using kudu::consensus::OperationType; |
| using kudu::consensus::OperationType_Name; |
| using kudu::consensus::PARTICIPANT_OP; |
| using kudu::consensus::RaftConfigPB; |
| using kudu::consensus::ReplicateMsg; |
| using kudu::consensus::WRITE_OP; |
| using kudu::fs::IOContext; |
| using kudu::log::Log; |
| using kudu::log::LogAnchorRegistry; |
| using kudu::log::LogEntryPB; |
| using kudu::log::LogIndex; |
| using kudu::log::LogOptions; |
| using kudu::log::LogReader; |
| using kudu::log::ReadableLogSegment; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::ResultTracker; |
| using kudu::tserver::AlterSchemaRequestPB; |
| using kudu::tserver::WriteRequestPB; |
| using kudu::tserver::WriteResponsePB; |
| using std::map; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tablet { |
| |
| struct ReplayState; |
| |
| // Information from the tablet metadata which indicates which data was |
| // flushed prior to this restart and which memory stores are still active. |
| // |
| // We take a snapshot of this information at the beginning of the bootstrap |
| // process so that we can allow compactions and flushes to run during bootstrap |
| // without confusing our tracking of flushed stores. |
| // |
| // NOTE: automatic flushes and compactions are not currently scheduled during |
| // bootstrap. However, flushes may still be triggered due to operations like |
| // alter-table. |
| class FlushedStoresSnapshot { |
| public: |
| FlushedStoresSnapshot() {} |
| Status InitFrom(const TabletMetadata& tablet_meta); |
| |
| // Return true if the given memory store is still active (i.e. edits that were |
| // originally written to this memory store should be replayed during the bootstrap |
| // process). |
| // |
| // NOTE: a store may be inactive for either of two reasons. Either: |
| // (a) the store was flushed to disk, OR |
| // (b) the store was in the process of being written by a flush or compaction |
| // but the process crashed before the associated tablet metadata update |
| // was committed. |
| bool IsMemStoreActive(const MemStoreTargetPB& target) const; |
| |
| private: |
| int64_t last_durable_mrs_id_; |
| unordered_map<int64_t, int64_t> flushed_dms_by_drs_id_; |
| |
| DISALLOW_COPY_AND_ASSIGN(FlushedStoresSnapshot); |
| }; |
| |
| // Bootstraps an existing tablet by opening the metadata from disk, and rebuilding soft |
| // state by playing log segments. A bootstrapped tablet can then be added to an existing |
| // consensus configuration as a LEARNER, which will bring its state up to date with the |
| // rest of the consensus configuration, or it can start serving the data itself, after it |
| // has been appointed LEADER of that particular consensus configuration. |
| // |
| // The high-level steps to replay the WAL are as follows: |
| // for each segment in the WAL: |
| // for each entry in the segment: |
| // - If the entry is a replicate message, keep track of it and write it to |
| // the new WAL, since we may find a corresponding commit message later. |
| // - If the entry is a commit message, determine whether or not we have a |
| // corresponding replicate message in the WAL, and if so, replay the op, |
| // skipping operations whose mem-stores have been persisted to disk. If |
| // no replicate message is found, we can skip replaying the op since its |
| // mutation has been persisted to disk. |
| // for each commit message with no corresponding replicate message: |
| // - Validate that the mutated stores are non-active. |
| // for each replicate message with no corresponding commit message: |
| // - Return the replicate message as an "orphaned replicate". |
| // - When the RaftConsensus instance starts up, orphaned replicates are |
| // initialized as follower ops. |
| // |
| // NOTE: this does not handle pulling data from other replicas in the cluster. That |
| // is handled by the 'TabletCopy' classes, which copy blocks and metadata locally |
| // before invoking this local bootstrap functionality to start the tablet. |
| // |
| // TODO Because the table that is being rebuilt is never flushed/compacted, consensus |
| // is only set on the tablet after bootstrap, when we get to flushes/compactions though |
| // we need to set it before replay or we won't be able to re-rebuild. |
| class TabletBootstrap { |
| public: |
| TabletBootstrap(scoped_refptr<TabletMetadata> tablet_meta, |
| RaftConfigPB committed_raft_config, |
| Clock* clock, |
| shared_ptr<MemTracker> mem_tracker, |
| scoped_refptr<ResultTracker> result_tracker, |
| MetricRegistry* metric_registry, |
| FileCache* file_cache, |
| scoped_refptr<TabletReplica> tablet_replica, |
| scoped_refptr<LogAnchorRegistry> log_anchor_registry); |
| |
| // Plays the log segments, rebuilding the portion of the Tablet's soft |
| // state that is present in the log (additional soft state may be present |
| // in other replicas). |
| // A successful call will yield the rebuilt tablet and the rebuilt log. |
| Status Bootstrap(shared_ptr<Tablet>* rebuilt_tablet, |
| scoped_refptr<Log>* rebuilt_log, |
| ConsensusBootstrapInfo* consensus_info); |
| |
| private: |
| |
| // The method that does the actual work of tablet bootstrap. Bootstrap() is |
| // actually a wrapper method that is responsible for pinning and unpinning |
| // the tablet metadata flush. |
| Status RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet, |
| scoped_refptr<Log>* rebuilt_log, |
| ConsensusBootstrapInfo* consensus_info); |
| |
| // Opens the tablet. |
| // Sets '*has_blocks' to true if there was any data on disk for this tablet. |
| Status OpenTablet(bool* has_blocks); |
| |
| // Checks if a previous log recovery directory exists. If so, it deletes any |
| // files in the log dir and sets 'needs_recovery' to true, meaning that the |
| // previous recovery attempt should be retried from the recovery dir. |
| // |
| // Otherwise, if there is a log directory with log files in it, renames that |
| // log dir to the log recovery dir and creates a new, empty log dir so that |
| // log replay can proceed. 'needs_recovery' is also returned as true in this |
| // case. |
| // |
| // If no log segments are found, 'needs_recovery' is set to false. |
| Status PrepareRecoveryDir(bool* needs_recovery); |
| |
| // Opens the latest log segments for the Tablet that will allow to rebuild |
| // the tablet's soft state. If there are existing log segments in the tablet's |
| // log directly they are moved to a "log-recovery" directory which is deleted |
| // when the replay process is completed (as they have been duplicated in the |
| // current log directory). |
| // |
| // If a "log-recovery" directory is already present, we will continue to replay |
| // from the "log-recovery" directory. Tablet metadata is updated once replay |
| // has finished from the "log-recovery" directory. |
| Status OpenLogReaderInRecoveryDir(); |
| |
| // Opens a new log in the tablet's log directory. |
| // The directory is expected to be clean. |
| Status OpenNewLog(); |
| |
| // Finishes bootstrap, setting 'rebuilt_log' and 'rebuilt_tablet'. |
| Status FinishBootstrap(const string& message, |
| scoped_refptr<log::Log>* rebuilt_log, |
| shared_ptr<Tablet>* rebuilt_tablet); |
| |
| // Plays the log segments into the tablet being built. |
| // The process of playing the segments generates a new log that can be continued |
| // later on when then tablet is rebuilt and starts accepting writes from clients. |
| Status PlaySegments(const IOContext* io_context, ConsensusBootstrapInfo* consensus_info); |
| |
| // Append the given commit message to the log. |
| // Does not support writing a TxResult. |
| Status AppendCommitMsg(const CommitMsg& commit_msg); |
| |
| Status PlayWriteRequest(const IOContext* io_context, ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg); |
| |
| Status PlayAlterSchemaRequest(const IOContext* io_context, ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg); |
| |
| Status PlayChangeConfigRequest(const IOContext* io_context, ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg); |
| |
| Status PlayTxnParticipantOpRequest(const IOContext* io_context, ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg); |
| |
| Status PlayNoOpRequest(const IOContext* io_context, ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg); |
| |
| // Plays operations, skipping those that have already been flushed or have previously failed. |
| // See ApplyRowOperations() for more details on how the decision of whether an operation |
| // is applied or skipped is made. |
| Status PlayRowOperations(const IOContext* io_context, |
| WriteOpState* op_state, |
| const TxResultPB& orig_result, |
| TxResultPB* new_result); |
| |
| // Determine which of the operations from 'orig_result' must be skipped. |
| // At the same time this builds the WriteResponsePB that we'll store on the ResultTracker. |
| // 'new_result' store the results of the operations that were skipped, 'response' stores |
| // any error that might have previously happened so that we can send them back to clients, |
| // if needed. |
| // Finally 'all_skipped' indicates whether all of the original operations were skipped. |
| Status DetermineSkippedOpsAndBuildResponse(const TxResultPB& orig_result, |
| TxResultPB* new_result, |
| WriteResponsePB* response, |
| bool* all_skipped); |
| |
| // Pass through all of the decoded operations in op_state. For each op: |
| // - if it was previously failed, mark as failed |
| // - if it previously succeeded but was flushed, skip it. |
| // - otherwise, re-apply to the tablet being bootstrapped. |
| Status ApplyOperations(const IOContext* io_context, |
| WriteOpState* op_state, |
| const TxResultPB& orig_result, |
| TxResultPB* new_result); |
| |
| enum OpAction { |
| // The operation was never applied or was applied to an unflushed memory store and thus |
| // needs to be applied again. |
| NEEDS_REPLAY, |
| // The operation was already applied to a memory store that was flushed. |
| SKIP_PREVIOUSLY_FLUSHED, |
| // The operation was never applied due to an error. |
| SKIP_PREVIOUSLY_FAILED |
| }; |
| |
| // Filter a row operation, setting 'action' to indicate what needs to be done |
| // to the operation, i.e. whether it must applied or skipped. |
| Status FilterOperation(const OperationResultPB& op_result, |
| OpAction* action); |
| |
| enum ActiveStores { |
| // The OperationResultPBs in the commit message do not reference any stores. |
| // This can happen in the case that the operations did not result in any mutations |
| // (e.g. because they were updates for not-found row keys). |
| NO_MUTATED_STORES, |
| |
| // At least one operation resulted in a mutation to a store, but none of the |
| // mutated stores are still active. Therefore the operation does not need to |
| // be replayed. |
| NO_STORES_ACTIVE, |
| |
| // At least one operation resulted in a mutation to a store, and at least |
| // one of those mutated stores is still active. This implies that the operation |
| // needs to be replayed. |
| SOME_STORES_ACTIVE |
| }; |
| |
| // For the given commit message, analyze which memory stores were mutated |
| // by the operation, returning one of the enum values above. |
| ActiveStores AnalyzeActiveStores(const CommitMsg& commit); |
| |
| void DumpReplayStateToLog(const ReplayState& state); |
| |
| Status HandleEntry(const IOContext* io_context, |
| ReplayState* state, |
| unique_ptr<LogEntryPB> entry, |
| string* entry_debug_info); |
| |
| // Handlers for each type of message seen in the log during replay. |
| Status HandleReplicateMessage(ReplayState* state, |
| unique_ptr<LogEntryPB> entry, |
| string* entry_debug_info); |
| Status HandleCommitMessage(const IOContext* io_context, ReplayState* state, |
| unique_ptr<LogEntryPB> entry, |
| string* entry_debug_info); |
| |
| Status ApplyCommitMessage(const IOContext* io_context, ReplayState* state, LogEntryPB* entry); |
| Status HandleEntryPair(const IOContext* io_context, LogEntryPB* replicate_entry, |
| LogEntryPB* commit_entry); |
| |
| // Checks that an orphaned commit message is actually irrelevant, i.e that none |
| // of the data stores it refers to are live. |
| Status CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit); |
| |
| // Decodes a Timestamp from the provided string and updates the clock |
| // with it. |
| Status UpdateClock(uint64_t timestamp); |
| |
| // Return a log prefix string in the standard "T xxx P yyy" format. |
| string LogPrefix() const; |
| |
| // Log a status message and set the TabletReplica's status as well. |
| void SetStatusMessage(const string& status); |
| |
| const scoped_refptr<TabletMetadata> tablet_meta_; |
| const RaftConfigPB committed_raft_config_; |
| Clock* clock_; |
| shared_ptr<MemTracker> mem_tracker_; |
| scoped_refptr<rpc::ResultTracker> result_tracker_; |
| MetricRegistry* metric_registry_; |
| FileCache* file_cache_; |
| scoped_refptr<TabletReplica> tablet_replica_; |
| unique_ptr<tablet::Tablet> tablet_; |
| const scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_; |
| scoped_refptr<log::Log> log_; |
| std::shared_ptr<log::LogReader> log_reader_; |
| |
| // Statistics on the replay of entries in the log. |
| struct Stats { |
| Stats() |
| : ops_read(0), |
| ops_overwritten(0), |
| ops_ignored(0), |
| ops_committed(0), |
| inserts_seen(0), |
| inserts_ignored(0), |
| mutations_seen(0), |
| mutations_ignored(0), |
| orphaned_commits(0) { |
| } |
| |
| string ToString() const { |
| return Substitute("ops{read=$0 overwritten=$1 applied=$2 ignored=$3} " |
| "inserts{seen=$4 ignored=$5} " |
| "mutations{seen=$6 ignored=$7} " |
| "orphaned_commits=$8", |
| ops_read, ops_overwritten, ops_committed, ops_ignored, |
| inserts_seen, inserts_ignored, |
| mutations_seen, mutations_ignored, |
| orphaned_commits); |
| } |
| |
| // Number of REPLICATE messages read from the log |
| int ops_read; |
| // Number of REPLICATE messages which were overwritten by later entries. |
| int ops_overwritten; |
| // Number of REPLICATE messages which were able to be completely ignored |
| // because the COMMIT message indicated that all of the contained operations |
| // were already flushed. |
| int ops_ignored; |
| // Number of REPLICATE messages for which a matching COMMIT was found. |
| int ops_committed; |
| |
| // Number inserts/mutations seen and ignored. Note inserts_ignored does not refer |
| // to the INSERT_IGNORE operation. It refers to inserts ignored during log replay. |
| int inserts_seen, inserts_ignored; |
| int mutations_seen, mutations_ignored; |
| |
| // Number of COMMIT messages for which a corresponding REPLICATE was not found. |
| int orphaned_commits; |
| }; |
| Stats stats_; |
| |
| // Snapshot of which stores were flushed prior to restart. |
| FlushedStoresSnapshot flushed_stores_; |
| |
| DISALLOW_COPY_AND_ASSIGN(TabletBootstrap); |
| }; |
| |
| void TabletBootstrap::SetStatusMessage(const string& status) { |
| LOG_WITH_PREFIX(INFO) << status; |
| if (tablet_replica_) tablet_replica_->SetStatusMessage(status); |
| } |
| |
| Status BootstrapTablet(scoped_refptr<TabletMetadata> tablet_meta, |
| RaftConfigPB committed_raft_config, |
| Clock* clock, |
| shared_ptr<MemTracker> mem_tracker, |
| scoped_refptr<ResultTracker> result_tracker, |
| MetricRegistry* metric_registry, |
| FileCache* file_cache, |
| scoped_refptr<TabletReplica> tablet_replica, |
| scoped_refptr<log::LogAnchorRegistry> log_anchor_registry, |
| shared_ptr<tablet::Tablet>* rebuilt_tablet, |
| scoped_refptr<log::Log>* rebuilt_log, |
| ConsensusBootstrapInfo* consensus_info) { |
| TRACE_EVENT1("tablet", "BootstrapTablet", |
| "tablet_id", tablet_meta->tablet_id()); |
| TabletBootstrap bootstrap(std::move(tablet_meta), |
| std::move(committed_raft_config), |
| clock, |
| std::move(mem_tracker), |
| std::move(result_tracker), |
| metric_registry, |
| file_cache, |
| std::move(tablet_replica), |
| std::move(log_anchor_registry)); |
| RETURN_NOT_OK(bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, consensus_info)); |
| // This is necessary since OpenNewLog() initially disables sync. |
| RETURN_NOT_OK((*rebuilt_log)->ReEnableSyncIfRequired()); |
| return Status::OK(); |
| } |
| |
| static string DebugInfo(const string& tablet_id, |
| int segment_seqno, |
| int entry_idx, |
| const string& segment_path, |
| const string& entry_debug_info) { |
| // Truncate the debug string to a reasonable length for logging. |
| // Otherwise, glog will truncate for us and we may miss important |
| // information which came after this long string. |
| string debug_str = entry_debug_info; |
| if (debug_str.size() > 500) { |
| debug_str.resize(500); |
| debug_str.append("..."); |
| } |
| if (!debug_str.empty()) { |
| debug_str = Substitute(" Entry: $0", debug_str); |
| } |
| return Substitute("Debug Info: Error playing entry $0 of segment $1 of tablet $2. " |
| "Segment path: $3.$4", entry_idx, segment_seqno, tablet_id, |
| segment_path, debug_str); |
| } |
| |
| TabletBootstrap::TabletBootstrap( |
| scoped_refptr<TabletMetadata> tablet_meta, |
| RaftConfigPB committed_raft_config, |
| Clock* clock, |
| shared_ptr<MemTracker> mem_tracker, |
| scoped_refptr<ResultTracker> result_tracker, |
| MetricRegistry* metric_registry, |
| FileCache* file_cache, |
| scoped_refptr<TabletReplica> tablet_replica, |
| scoped_refptr<LogAnchorRegistry> log_anchor_registry) |
| : tablet_meta_(std::move(tablet_meta)), |
| committed_raft_config_(std::move(committed_raft_config)), |
| clock_(clock), |
| mem_tracker_(std::move(mem_tracker)), |
| result_tracker_(std::move(result_tracker)), |
| metric_registry_(metric_registry), |
| file_cache_(file_cache), |
| tablet_replica_(std::move(tablet_replica)), |
| log_anchor_registry_(std::move(log_anchor_registry)) {} |
| |
| Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet, |
| scoped_refptr<Log>* rebuilt_log, |
| ConsensusBootstrapInfo* consensus_info) { |
| // We pin (prevent) metadata flush at the beginning of the bootstrap process |
| // and always unpin it at the end. |
| tablet_meta_->PinFlush(); |
| |
| // Now run the actual bootstrap process. |
| Status bootstrap_status = RunBootstrap(rebuilt_tablet, rebuilt_log, consensus_info); |
| |
| // Add a callback to TabletMetadata that makes sure that each time we flush the metadata |
| // we also wait for in-flights to finish and for their wal entry to be fsynced. |
| // This might be a bit conservative in some situations but it will prevent us from |
| // ever flushing the metadata referring to tablet data blocks containing data whose |
| // commit entries are not durable, a pre-requisite for recovery. |
| CHECK((*rebuilt_tablet && *rebuilt_log) || !bootstrap_status.ok()) |
| << "Tablet and Log not initialized"; |
| if (bootstrap_status.ok()) { |
| auto cb = make_scoped_refptr(new FlushInflightsToLogCallback( |
| rebuilt_tablet->get(), *rebuilt_log)); |
| tablet_meta_->SetPreFlushCallback( |
| [cb]() { return cb->WaitForInflightsAndFlushLog(); }); |
| } |
| |
| // This will cause any pending TabletMetadata flush to be executed. |
| Status unpin_status = tablet_meta_->UnPinFlush(); |
| |
| constexpr char kFailedUnpinMsg[] = "Failed to flush after unpinning"; |
| if (PREDICT_FALSE(!bootstrap_status.ok() && !unpin_status.ok())) { |
| LOG_WITH_PREFIX(WARNING) << kFailedUnpinMsg << ": " << unpin_status.ToString(); |
| return bootstrap_status; |
| } |
| RETURN_NOT_OK(bootstrap_status); |
| RETURN_NOT_OK_PREPEND(unpin_status, Substitute("$0$1", LogPrefix(), kFailedUnpinMsg)); |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet, |
| scoped_refptr<Log>* rebuilt_log, |
| ConsensusBootstrapInfo* consensus_info) { |
| string tablet_id = tablet_meta_->tablet_id(); |
| |
| // Make sure we don't try to locally bootstrap a tablet that was in the middle |
| // of a tablet copy. It's likely that not all files were copied over |
| // successfully. |
| TabletDataState tablet_data_state = tablet_meta_->tablet_data_state(); |
| if (tablet_data_state != TABLET_DATA_READY) { |
| return Status::Corruption("Unable to locally bootstrap tablet " + tablet_id + ": " + |
| "TabletMetadata bootstrap state is " + |
| TabletDataState_Name(tablet_data_state)); |
| } |
| |
| SetStatusMessage("Bootstrap starting."); |
| |
| if (VLOG_IS_ON(1)) { |
| TabletSuperBlockPB super_block; |
| RETURN_NOT_OK(tablet_meta_->ToSuperBlock(&super_block)); |
| VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << SecureDebugString(super_block); |
| } |
| |
| // Ensure the tablet's data dirs are present and healthy before it is opened. |
| DataDirGroupPB data_dir_group; |
| RETURN_NOT_OK_PREPEND( |
| tablet_meta_->fs_manager()->dd_manager()->GetDataDirGroupPB(tablet_id, &data_dir_group), |
| "error retrieving tablet data dir group (one or more data dirs may have been removed)"); |
| if (tablet_meta_->fs_manager()->dd_manager()->IsTabletInFailedDir(tablet_id)) { |
| return Status::IOError("some tablet data is in a failed directory"); |
| } |
| |
| RETURN_NOT_OK(flushed_stores_.InitFrom(*tablet_meta_.get())); |
| |
| bool has_blocks; |
| RETURN_NOT_OK(OpenTablet(&has_blocks)); |
| |
| bool needs_recovery; |
| RETURN_NOT_OK(PrepareRecoveryDir(&needs_recovery)); |
| if (needs_recovery) { |
| RETURN_NOT_OK(OpenLogReaderInRecoveryDir()); |
| } |
| |
| // This is a new tablet, nothing left to do. |
| if (!has_blocks && !needs_recovery) { |
| LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log."; |
| RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log"); |
| RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log", |
| rebuilt_log, rebuilt_tablet)); |
| consensus_info->last_id = MinimumOpId(); |
| consensus_info->last_committed_id = MinimumOpId(); |
| return Status::OK(); |
| } |
| |
| // If there were blocks, there must be segments to replay. This is required |
| // by Raft, since we always need to know the term and index of the last |
| // logged op in order to vote, know how to respond to AppendEntries(), etc. |
| if (has_blocks && !needs_recovery) { |
| return Status::IllegalState(Substitute("Tablet $0: Found rowsets but no log " |
| "segments could be found.", |
| tablet_id)); |
| } |
| |
| IOContext io_context({ tablet_meta_->table_id() }); |
| RETURN_NOT_OK_PREPEND(PlaySegments(&io_context, consensus_info), "Failed log replay. Reason"); |
| |
| RETURN_NOT_OK(Log::RemoveRecoveryDirIfExists(tablet_->metadata()->fs_manager(), |
| tablet_->metadata()->tablet_id())); |
| RETURN_NOT_OK(FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet)); |
| |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::FinishBootstrap(const string& message, |
| scoped_refptr<log::Log>* rebuilt_log, |
| shared_ptr<Tablet>* rebuilt_tablet) { |
| RETURN_NOT_OK(tablet_->MarkFinishedBootstrapping()); |
| SetStatusMessage(message); |
| rebuilt_tablet->reset(tablet_.release()); |
| rebuilt_log->swap(log_); |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::OpenTablet(bool* has_blocks) { |
| unique_ptr<Tablet> tablet(new Tablet(tablet_meta_, |
| clock_, |
| mem_tracker_, |
| metric_registry_, |
| log_anchor_registry_)); |
| // doing nothing for now except opening a tablet locally. |
| { |
| SCOPED_LOG_SLOW_EXECUTION_PREFIX(INFO, 100, LogPrefix(), "opening tablet"); |
| RETURN_NOT_OK(tablet->Open()); |
| } |
| *has_blocks = tablet->num_rowsets() != 0; |
| tablet_ = std::move(tablet); |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::PrepareRecoveryDir(bool* needs_recovery) { |
| *needs_recovery = false; |
| |
| FsManager* fs_manager = tablet_->metadata()->fs_manager(); |
| string tablet_id = tablet_->metadata()->tablet_id(); |
| string log_dir = fs_manager->GetTabletWalDir(tablet_id); |
| |
| // If the recovery directory exists, then we crashed mid-recovery. |
| // Throw away any logs from the previous recovery attempt and restart the log |
| // replay process from the beginning using the same recovery dir as last time. |
| string recovery_path = fs_manager->GetTabletWalRecoveryDir(tablet_id); |
| if (fs_manager->Exists(recovery_path)) { |
| LOG_WITH_PREFIX(INFO) << "Previous recovery directory found at " << recovery_path << ": " |
| << "Replaying log files from this location instead of " << log_dir; |
| |
| // Since we have a recovery directory, clear out the log_dir by recursively |
| // deleting it and creating a new one so that we don't end up with remnants |
| // of old WAL segments or indexes after replay. |
| if (fs_manager->Exists(log_dir)) { |
| LOG_WITH_PREFIX(INFO) << "Deleting old log files from previous recovery attempt in " |
| << log_dir; |
| RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteRecursively(log_dir), |
| "Could not recursively delete old log dir " + log_dir); |
| } |
| |
| RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(fs_manager->env(), log_dir), |
| "Failed to create log directory " + log_dir); |
| |
| *needs_recovery = true; |
| return Status::OK(); |
| } |
| |
| // If we made it here, there was no pre-existing recovery dir. |
| // Now we look for log files in log_dir, and if we find any then we rename |
| // the whole log_dir to a recovery dir and return needs_recovery = true. |
| RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(fs_manager->env(), log_dir), |
| "Failed to create log dir"); |
| |
| vector<string> children; |
| RETURN_NOT_OK_PREPEND(fs_manager->ListDir(log_dir, &children), |
| "Couldn't list log segments."); |
| for (const string& child : children) { |
| if (!log::IsLogFileName(child)) { |
| continue; |
| } |
| |
| string source_path = JoinPathSegments(log_dir, child); |
| string dest_path = JoinPathSegments(recovery_path, child); |
| VLOG_WITH_PREFIX(1) << "Will attempt to recover log segment " << source_path |
| << " to " << dest_path; |
| *needs_recovery = true; |
| } |
| |
| if (*needs_recovery) { |
| // Atomically rename the log directory to the recovery directory |
| // and then re-create the log directory. |
| VLOG_WITH_PREFIX(1) << "Moving log directory " << log_dir << " to recovery directory " |
| << recovery_path << " in preparation for log replay"; |
| RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(log_dir, recovery_path), |
| Substitute("Could not move log directory $0 to recovery dir $1", |
| log_dir, recovery_path)); |
| RETURN_NOT_OK_PREPEND(fs_manager->env()->CreateDir(log_dir), |
| "Failed to recreate log directory " + log_dir); |
| } |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::OpenLogReaderInRecoveryDir() { |
| const string& tablet_id = tablet_->tablet_id(); |
| FsManager* fs_manager = tablet_meta_->fs_manager(); |
| VLOG_WITH_PREFIX(1) << "Opening log reader in log recovery dir " |
| << fs_manager->GetTabletWalRecoveryDir(tablet_id); |
| // Open the reader. |
| // Since we're recovering, we don't want to have any log index -- since it |
| // isn't fsynced() during writing, its contents are useless to us. |
| scoped_refptr<LogIndex> log_index(nullptr); |
| const string recovery_dir = fs_manager->GetTabletWalRecoveryDir(tablet_id); |
| RETURN_NOT_OK_PREPEND(LogReader::Open(fs_manager->env(), recovery_dir, log_index, tablet_id, |
| tablet_->GetMetricEntity().get(), |
| file_cache_, |
| &log_reader_), |
| "Could not open LogReader. Reason"); |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::OpenNewLog() { |
| RETURN_NOT_OK(Log::Open(LogOptions(), |
| tablet_->metadata()->fs_manager(), |
| file_cache_, |
| tablet_->tablet_id(), |
| *tablet_->schema(), |
| tablet_->metadata()->schema_version(), |
| tablet_->GetMetricEntity(), |
| &log_)); |
| // Disable sync temporarily in order to speed up appends during the |
| // bootstrap process. |
| log_->DisableSync(); |
| return Status::OK(); |
| } |
| |
| typedef map<int64_t, unique_ptr<LogEntryPB>> OpIndexToEntryMap; |
| |
| // State kept during replay. |
| struct ReplayState { |
| ReplayState() |
| : prev_op_id(MinimumOpId()), |
| committed_op_id(MinimumOpId()) { |
| } |
| |
| // Return true if 'b' is allowed to immediately follow 'a' in the log. |
| static bool IsValidSequence(const OpId& a, const OpId& b) { |
| if (a.term() == 0 && a.index() == 0) { |
| // Not initialized - can start with any opid. |
| return true; |
| } |
| |
| // Within the same term, we should never skip entries. |
| // We can, however go backwards (see KUDU-783 for an example) |
| if (b.term() == a.term() && |
| b.index() > a.index() + 1) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| // Return a Corruption status if 'id' seems to be out-of-sequence in the log. |
| Status CheckSequentialReplicateId(const ReplicateMsg& msg) { |
| DCHECK(msg.has_id()); |
| if (PREDICT_FALSE(!IsValidSequence(prev_op_id, msg.id()))) { |
| string op_desc = Substitute("$0 REPLICATE (Type: $1)", |
| OpIdToString(msg.id()), |
| OperationType_Name(msg.op_type())); |
| return Status::Corruption( |
| Substitute("Unexpected opid following opid $0. Operation: $1", |
| OpIdToString(prev_op_id), |
| op_desc)); |
| } |
| |
| prev_op_id = msg.id(); |
| return Status::OK(); |
| } |
| |
| void UpdateCommittedOpId(const OpId& id) { |
| if (id.index() > committed_op_id.index()) { |
| committed_op_id = id; |
| } |
| } |
| |
| void AddEntriesToStrings(const OpIndexToEntryMap& entries, vector<string>* strings) const { |
| for (const auto& map_entry : entries) { |
| const LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second.get()); |
| strings->push_back(Substitute(" $0", SecureShortDebugString(*entry))); |
| } |
| } |
| |
| void DumpReplayStateToStrings(vector<string>* strings) const { |
| strings->push_back(Substitute("ReplayState: Previous OpId: $0, Committed OpId: $1, " |
| "Pending Replicates: $2, Pending Commits: $3", OpIdToString(prev_op_id), |
| OpIdToString(committed_op_id), pending_replicates.size(), pending_commits.size())); |
| if (!pending_replicates.empty()) { |
| strings->push_back("Dumping REPLICATES: "); |
| AddEntriesToStrings(pending_replicates, strings); |
| } |
| if (!pending_commits.empty()) { |
| strings->push_back("Dumping COMMITS: "); |
| AddEntriesToStrings(pending_commits, strings); |
| } |
| } |
| |
| // The last replicate message's ID. |
| OpId prev_op_id; |
| |
| // The last operation known to be committed. |
| // All other operations with lower IDs are also committed. |
| OpId committed_op_id; |
| |
| // REPLICATE log entries whose corresponding COMMIT record has |
| // not yet been seen. Keyed by index. |
| OpIndexToEntryMap pending_replicates; |
| |
| // COMMIT log entries which couldn't be applied immediately. |
| OpIndexToEntryMap pending_commits; |
| }; |
| |
| // Handle the given log entry. |
| Status TabletBootstrap::HandleEntry(const IOContext* io_context, |
| ReplayState* state, |
| unique_ptr<LogEntryPB> entry, |
| string* entry_debug_info) { |
| DCHECK(entry); |
| VLOG_WITH_PREFIX(1) << "Handling entry: " << SecureShortDebugString(*entry); |
| |
| const auto entry_type = entry->type(); |
| switch (entry_type) { |
| case log::REPLICATE: |
| RETURN_NOT_OK(HandleReplicateMessage(state, std::move(entry), entry_debug_info)); |
| break; |
| case log::COMMIT: |
| // check the unpaired ops for the matching replicate msg, abort if not found |
| RETURN_NOT_OK(HandleCommitMessage(io_context, state, |
| std::move(entry), entry_debug_info)); |
| break; |
| default: |
| return Status::Corruption(Substitute("unexpected log entry type: $0", entry_type)); |
| } |
| MAYBE_FAULT(FLAGS_fault_crash_during_log_replay); |
| return Status::OK(); |
| } |
| |
| // Repair overflow issue reported in KUDU-1933. |
| void CheckAndRepairOpIdOverflow(OpId* opid) { |
| if (PREDICT_FALSE(opid->term() < consensus::kMinimumTerm)) { |
| int64_t overflow = opid->term() - INT32_MIN + 1LL; |
| CHECK_GE(overflow, 1) << OpIdToString(*opid); |
| opid->set_term(static_cast<int64_t>(INT32_MAX) + overflow); |
| } |
| if (PREDICT_FALSE(opid->index() < consensus::kMinimumOpIdIndex && |
| opid->index() != consensus::kInvalidOpIdIndex)) { |
| int64_t overflow = opid->index() - INT32_MIN + 1LL; |
| CHECK_GE(overflow, 1) << OpIdToString(*opid); |
| // Sanity check. Even with the bug in KUDU-1933, the number of bytes |
| // allowed in a single group commit is a generous upper bound on how far a |
| // log index may have overflowed before causing a crash. |
| CHECK_LT(overflow, FLAGS_group_commit_queue_size_bytes) << OpIdToString(*opid); |
| opid->set_index(static_cast<int64_t>(INT32_MAX) + overflow); |
| } |
| } |
| |
| Status TabletBootstrap::HandleReplicateMessage(ReplayState* state, |
| unique_ptr<LogEntryPB> entry, |
| string* entry_debug_info) { |
| auto info_collector = MakeScopedCleanup([&]() { |
| if (entry) { |
| *entry_debug_info = SecureShortDebugString(*entry); |
| } |
| }); |
| DCHECK(entry->has_replicate()) |
| << "not a replicate message: " << SecureDebugString(*entry); |
| stats_.ops_read++; |
| |
| // Fix overflow if necessary (see KUDU-1933). |
| CheckAndRepairOpIdOverflow(entry->mutable_replicate()->mutable_id()); |
| |
| const ReplicateMsg& replicate = entry->replicate(); |
| RETURN_NOT_OK(state->CheckSequentialReplicateId(replicate)); |
| DCHECK(replicate.has_timestamp()); |
| CHECK_OK(UpdateClock(replicate.timestamp())); |
| |
| // Append the replicate message to the log as is |
| RETURN_NOT_OK(log_->Append(entry.get())); |
| |
| const int64_t index = replicate.id().index(); |
| const auto existing_entry_iter = state->pending_replicates.find(index); |
| if (existing_entry_iter != state->pending_replicates.end()) { |
| // If there was a entry with the same index we're overwriting then we need |
| // to delete that entry and all entries with higher indexes. |
| const auto& existing_entry = existing_entry_iter->second; |
| |
| auto iter = state->pending_replicates.lower_bound(index); |
| DCHECK(OpIdEquals(iter->second->replicate().id(), existing_entry->replicate().id())); |
| |
| const auto& last_entry = state->pending_replicates.rbegin()->second; |
| VLOG_WITH_PREFIX(1) << "Overwriting operations starting at: " |
| << existing_entry->replicate().id() |
| << " up to: " << last_entry->replicate().id() |
| << " with operation: " << replicate.id(); |
| |
| while (iter != state->pending_replicates.end()) { |
| iter = state->pending_replicates.erase(iter); |
| stats_.ops_overwritten++; |
| } |
| } |
| EmplaceOrDie(&state->pending_replicates, index, std::move(entry)); |
| info_collector.cancel(); |
| |
| return Status::OK(); |
| } |
| |
| // On returning OK, takes ownership of the pointer from the 'entry_ptr' wrapper. |
| Status TabletBootstrap::HandleCommitMessage(const IOContext* io_context, ReplayState* state, |
| unique_ptr<LogEntryPB> entry, |
| string* entry_debug_info) { |
| auto info_collector = MakeScopedCleanup([&]() { |
| if (entry) { |
| *entry_debug_info = SecureShortDebugString(*entry); |
| } |
| }); |
| DCHECK(entry->has_commit()) |
| << "not a commit message: " << SecureDebugString(*entry); |
| |
| // Fix overflow if necessary (see KUDU-1933). |
| CheckAndRepairOpIdOverflow(entry->mutable_commit()->mutable_commited_op_id()); |
| |
| // Match up the COMMIT record with the original entry that it's applied to. |
| const OpId& committed_op_id = entry->commit().commited_op_id(); |
| state->UpdateCommittedOpId(committed_op_id); |
| |
| // If there are no pending replicates, or if this commit's index is lower than the |
| // the first pending replicate on record this is likely an orphaned commit. |
| if (state->pending_replicates.empty() || |
| (*state->pending_replicates.begin()).first > committed_op_id.index()) { |
| VLOG_WITH_PREFIX(2) << "Found orphaned commit for " << committed_op_id; |
| RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit())); |
| stats_.orphaned_commits++; |
| info_collector.cancel(); |
| return Status::OK(); |
| } |
| |
| // If this commit does not correspond to the first replicate message in the pending |
| // replicates set we keep it to apply later... |
| if ((*state->pending_replicates.begin()).first != committed_op_id.index()) { |
| if (!ContainsKey(state->pending_replicates, committed_op_id.index())) { |
| return Status::Corruption(Substitute("Could not find replicate for commit: $0", |
| SecureShortDebugString(*entry))); |
| } |
| VLOG_WITH_PREFIX(2) << "Adding pending commit for " << committed_op_id; |
| EmplaceOrDie(&state->pending_commits, committed_op_id.index(), std::move(entry)); |
| info_collector.cancel(); |
| return Status::OK(); |
| } |
| |
| // ... if it does, we apply it and all the commits that immediately follow in the sequence. |
| OpId last_applied = committed_op_id; |
| RETURN_NOT_OK(ApplyCommitMessage(io_context, state, entry.get())); |
| |
| auto iter = state->pending_commits.begin(); |
| while (iter != state->pending_commits.end()) { |
| if (iter->first == last_applied.index() + 1) { |
| auto& buffered_commit_entry(iter->second); |
| last_applied = buffered_commit_entry->commit().commited_op_id(); |
| RETURN_NOT_OK(ApplyCommitMessage(io_context, state, buffered_commit_entry.get())); |
| iter = state->pending_commits.erase(iter); |
| continue; |
| } |
| break; |
| } |
| |
| info_collector.cancel(); |
| return Status::OK(); |
| } |
| |
| TabletBootstrap::ActiveStores TabletBootstrap::AnalyzeActiveStores(const CommitMsg& commit) { |
| bool has_mutated_stores = false; |
| bool has_active_stores = false; |
| |
| for (const OperationResultPB& op_result : commit.result().ops()) { |
| for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) { |
| has_mutated_stores = true; |
| if (flushed_stores_.IsMemStoreActive(mutated_store)) { |
| has_active_stores = true; |
| } |
| } |
| } |
| |
| if (!has_mutated_stores) { |
| return NO_MUTATED_STORES; |
| } |
| return has_active_stores ? SOME_STORES_ACTIVE : NO_STORES_ACTIVE; |
| } |
| |
| Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit) { |
| if (AnalyzeActiveStores(commit) == SOME_STORES_ACTIVE) { |
| TabletSuperBlockPB super; |
| WARN_NOT_OK(tablet_meta_->ToSuperBlock(&super), |
| Substitute("$0$1", LogPrefix(), "Couldn't build TabletSuperBlockPB")); |
| return Status::Corruption(Substitute("CommitMsg was orphaned but it referred to " |
| "stores which need replay. Commit: $0. TabletMetadata: $1", |
| SecureShortDebugString(commit), |
| SecureShortDebugString(super))); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::ApplyCommitMessage(const IOContext* io_context, |
| ReplayState* state, LogEntryPB* entry) { |
| const OpId& committed_op_id = entry->commit().commited_op_id(); |
| VLOG_WITH_PREFIX(2) << "Applying commit for " << committed_op_id; |
| |
| // They should also have an associated replicate index (it may have been in a |
| // deleted log segment though). |
| unique_ptr<LogEntryPB> pending_replicate_entry(EraseKeyReturnValuePtr( |
| &state->pending_replicates, committed_op_id.index())); |
| if (pending_replicate_entry) { |
| // We found a replicate with the same index, make sure it also has the same |
| // term. |
| const auto& replicate = pending_replicate_entry->replicate(); |
| if (!OpIdEquals(committed_op_id, replicate.id())) { |
| string error_msg = Substitute("Committed operation's OpId: $0 didn't match the" |
| "commit message's committed OpId: $1. Pending operation: $2, Commit message: $3", |
| SecureShortDebugString(replicate.id()), |
| SecureShortDebugString(committed_op_id), |
| SecureShortDebugString(replicate), |
| SecureShortDebugString(entry->commit())); |
| LOG_WITH_PREFIX(DFATAL) << error_msg; |
| return Status::Corruption(error_msg); |
| } |
| RETURN_NOT_OK(HandleEntryPair(io_context, pending_replicate_entry.get(), entry)); |
| stats_.ops_committed++; |
| } else { |
| stats_.orphaned_commits++; |
| RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit())); |
| } |
| |
| return Status::OK(); |
| } |
| |
| // Never deletes 'replicate_entry' or 'commit_entry'. |
| Status TabletBootstrap::HandleEntryPair(const IOContext* io_context, LogEntryPB* replicate_entry, |
| LogEntryPB* commit_entry) { |
| const char* error_fmt = "Failed to play $0 request. ReplicateMsg: { $1 }, CommitMsg: { $2 }"; |
| |
| #define RETURN_NOT_OK_REPLAY(ReplayMethodName, io_context, replicate, commit) \ |
| RETURN_NOT_OK_PREPEND(ReplayMethodName(io_context, replicate, commit), \ |
| Substitute(error_fmt, OperationType_Name(op_type), \ |
| SecureShortDebugString(*(replicate)), \ |
| SecureShortDebugString(commit))) |
| |
| ReplicateMsg* replicate = replicate_entry->mutable_replicate(); |
| const CommitMsg& commit = commit_entry->commit(); |
| OperationType op_type = commit.op_type(); |
| |
| switch (op_type) { |
| case WRITE_OP: |
| RETURN_NOT_OK_REPLAY(PlayWriteRequest, io_context, replicate, commit); |
| break; |
| |
| case ALTER_SCHEMA_OP: |
| RETURN_NOT_OK_REPLAY(PlayAlterSchemaRequest, io_context, replicate, commit); |
| break; |
| |
| case CHANGE_CONFIG_OP: |
| RETURN_NOT_OK_REPLAY(PlayChangeConfigRequest, io_context, replicate, commit); |
| break; |
| |
| case PARTICIPANT_OP: |
| RETURN_NOT_OK_REPLAY(PlayTxnParticipantOpRequest, io_context, replicate, commit); |
| break; |
| |
| case NO_OP: |
| RETURN_NOT_OK_REPLAY(PlayNoOpRequest, io_context, replicate, commit); |
| break; |
| |
| default: |
| return Status::IllegalState(Substitute("Unsupported commit entry type: $0", |
| commit.op_type())); |
| } |
| |
| #undef RETURN_NOT_OK_REPLAY |
| |
| // We should only advance MVCC's safe time based on a specific set of |
| // operations: those whose timestamps are guaranteed to be monotonically |
| // increasing with respect to their entries in the write-ahead log. |
| bool timestamp_assigned_in_opid_order = true; |
| switch (op_type) { |
| case CHANGE_CONFIG_OP: |
| timestamp_assigned_in_opid_order = false; |
| break; |
| case NO_OP: { |
| const auto& req = replicate->noop_request(); |
| if (req.has_timestamp_in_opid_order()) { |
| timestamp_assigned_in_opid_order = req.timestamp_in_opid_order(); |
| } |
| break; |
| } |
| default: |
| break; |
| } |
| if (!timestamp_assigned_in_opid_order) { |
| return Status::OK(); |
| } |
| |
| // Handle advancement of our new timestamp lower bound watermark. |
| // |
| // If this message is a Raft election no-op, or is an op that has an external |
| // consistency mode other than COMMIT_WAIT, we know that no future op will |
| // have a timestamp that is lower than it, so we can just advance the safe |
| // timestamp to the message's timestamp. |
| // |
| // If the hybrid clock is disabled, all ops will fall into this category. |
| Timestamp new_lower_bound; |
| if (replicate->op_type() != consensus::WRITE_OP || |
| replicate->write_request().external_consistency_mode() != COMMIT_WAIT) { |
| new_lower_bound = Timestamp(replicate->timestamp()); |
| // ... else we set the new timestamp lower bound to be the op's |
| // timestamp minus the maximum clock error. This opens the door for problems |
| // if the flags changed across reboots, but this is unlikely and the problem |
| // would manifest itself immediately and clearly (mvcc would complain the |
| // operation is already committed, with a CHECK failure). |
| } else { |
| DCHECK(clock_->SupportsExternalConsistencyMode(COMMIT_WAIT)) << "The provided clock does not" |
| "support COMMIT_WAIT external consistency mode."; |
| new_lower_bound = clock::HybridClock::AddPhysicalTimeToTimestamp( |
| Timestamp(replicate->timestamp()), |
| MonoDelta::FromMicroseconds(-FLAGS_max_clock_sync_error_usec)); |
| } |
| tablet_->mvcc_manager()->AdjustNewOpLowerBound(new_lower_bound); |
| |
| return Status::OK(); |
| } |
| |
| void TabletBootstrap::DumpReplayStateToLog(const ReplayState& state) { |
| // Dump the replay state, this will log the pending replicates as well as the pending commits, |
| // which might be useful for debugging. |
| vector<string> state_dump; |
| state.DumpReplayStateToStrings(&state_dump); |
| for (const string& string : state_dump) { |
| LOG_WITH_PREFIX(INFO) << string; |
| } |
| } |
| |
| Status TabletBootstrap::PlaySegments(const IOContext* io_context, |
| ConsensusBootstrapInfo* consensus_info) { |
| ReplayState state; |
| log::SegmentSequence segments; |
| log_reader_->GetSegmentsSnapshot(&segments); |
| |
| // The first thing to do is to rewind the tablet's schema back to the schema |
| // as of the point in time where the logs begin. We must replay the writes |
| // in the logs with the correct point-in-time schema. |
| if (!segments.empty()) { |
| const scoped_refptr<ReadableLogSegment>& segment = segments[0]; |
| // Set the point-in-time schema for the tablet based on the log header. |
| Schema pit_schema; |
| RETURN_NOT_OK_PREPEND(SchemaFromPB(segment->header().schema(), &pit_schema), |
| "Couldn't decode log segment schema"); |
| RETURN_NOT_OK_PREPEND(tablet_->RewindSchemaForBootstrap( |
| pit_schema, segment->header().schema_version()), |
| "couldn't set point-in-time schema"); |
| } |
| |
| // We defer opening the log until here, so that we properly reproduce the |
| // point-in-time schema from the log we're reading into the log we're |
| // writing. |
| RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log"); |
| |
| auto last_status_update = MonoTime::Now(); |
| const auto kStatusUpdateInterval = MonoDelta::FromSeconds(5); |
| int segment_count = 0; |
| |
| for (const scoped_refptr<ReadableLogSegment>& segment : segments) { |
| log::LogEntryReader reader(segment.get()); |
| |
| int entry_count = 0; |
| while (true) { |
| { |
| unique_ptr<LogEntryPB> entry; |
| Status s = reader.ReadNextEntry(&entry); |
| if (PREDICT_FALSE(!s.ok())) { |
| if (s.IsEndOfFile()) { |
| break; |
| } |
| return Status::Corruption( |
| Substitute("Error reading Log Segment of tablet $0: $1 " |
| "(Read up to entry $2 of segment $3, in path $4)", |
| tablet_->tablet_id(), |
| s.ToString(), |
| entry_count, |
| segment->header().sequence_number(), |
| segment->path())); |
| } |
| entry_count++; |
| |
| string entry_debug_info; |
| s = HandleEntry(io_context, &state, std::move(entry), &entry_debug_info); |
| if (!s.ok()) { |
| DumpReplayStateToLog(state); |
| RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(), |
| segment->header().sequence_number(), |
| entry_count, segment->path(), |
| entry_debug_info)); |
| } |
| } |
| |
| const auto now = MonoTime::Now(); |
| if (now - last_status_update > kStatusUpdateInterval) { |
| SetStatusMessage(Substitute("Bootstrap replaying log segment $0/$1 " |
| "($2/$3 this segment, stats: $4)", |
| segment_count + 1, log_reader_->num_segments(), |
| HumanReadableNumBytes::ToString(reader.offset()), |
| HumanReadableNumBytes::ToString(reader.read_up_to_offset()), |
| stats_.ToString())); |
| last_status_update = now; |
| } |
| } |
| |
| SetStatusMessage(Substitute("Bootstrap replayed $0/$1 log segments. " |
| "Stats: $2. Pending: $3 replicates", |
| segment_count + 1, log_reader_->num_segments(), |
| stats_.ToString(), |
| state.pending_replicates.size())); |
| segment_count++; |
| } |
| |
| // If we have non-applied commits they all must belong to pending operations and |
| // they should only pertain to stores which are still active. |
| if (!state.pending_commits.empty()) { |
| for (const OpIndexToEntryMap::value_type& entry : state.pending_commits) { |
| if (!ContainsKey(state.pending_replicates, entry.first)) { |
| DumpReplayStateToLog(state); |
| return Status::Corruption("Had orphaned commits at the end of replay."); |
| } |
| |
| if (entry.second->commit().op_type() == WRITE_OP && |
| AnalyzeActiveStores(entry.second->commit()) == NO_STORES_ACTIVE) { |
| DumpReplayStateToLog(state); |
| TabletSuperBlockPB super; |
| WARN_NOT_OK(tablet_meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB."); |
| return Status::Corruption(Substitute("CommitMsg was pending but it did not refer " |
| "to any active memory stores. Commit: $0. TabletMetadata: $1", |
| SecureShortDebugString(entry.second->commit()), SecureShortDebugString(super))); |
| } |
| } |
| } |
| |
| // Note that we don't pass the information contained in the pending commits along with |
| // ConsensusBootstrapInfo. We know that this is safe as they must refer to active |
| // stores (we make doubly sure above). |
| // |
| // Example/Explanation: |
| // Say we have two different operations that touch the same row, one insert and one |
| // mutate. Since we use Early Lock Release the commit for the second (mutate) operation |
| // might end up in the log before the insert's commit. This wouldn't matter since |
| // we replay in order, but a corner case here is that we might crash before we |
| // write the commit for the insert, meaning it might not be present at all. |
| // |
| // One possible log for this situation would be: |
| // - Replicate 10.10 (insert) |
| // - Replicate 10.11 (mutate) |
| // - Commit 10.11 (mutate) |
| // ~CRASH while Commit 10.10 is in-flight~ |
| // |
| // We can't replay 10.10 during bootstrap because we haven't seen its commit, but |
| // since we can't replay out-of-order we won't replay 10.11 either, in fact we'll |
| // pass them both as "pending" to consensus to be applied again. |
| // |
| // The reason why it is safe to simply disregard 10.11's commit is that we know that |
| // it must refer only to active stores. We know this because one important flush/compact |
| // pre-condition is: |
| // - No flush will become visible on reboot (meaning we won't durably update the tablet |
| // metadata), unless the snapshot under which the flush/compact was performed has no |
| // in-flight ops and all the messages that are in-flight to the log are durable. |
| // |
| // In our example this means that if we had flushed/compacted after 10.10 was applied |
| // (meaning losing the commit message would lead to corruption as we might re-apply it) |
| // then the commit for 10.10 would be durable. Since it isn't then no flush/compaction |
| // occurred after 10.10 was applied and thus we can disregard the commit message for |
| // 10.11 and simply apply both 10.10 and 10.11 as if we hadn't applied them before. |
| // |
| // This generalizes to: |
| // - If a committed replicate message with index Y is missing a commit message, |
| // no later committed replicate message (with index > Y) is visible across reboots |
| // in the tablet data. |
| |
| if (VLOG_IS_ON(1)) { |
| DumpReplayStateToLog(state); |
| } |
| |
| // Set up the ConsensusBootstrapInfo structure for the caller. |
| for (OpIndexToEntryMap::value_type& e : state.pending_replicates) { |
| consensus_info->orphaned_replicates.push_back(e.second->release_replicate()); |
| } |
| consensus_info->last_id = state.prev_op_id; |
| consensus_info->last_committed_id = state.committed_op_id; |
| |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::AppendCommitMsg(const CommitMsg& commit_msg) { |
| LogEntryPB commit_entry; |
| commit_entry.set_type(log::COMMIT); |
| CommitMsg* commit = commit_entry.mutable_commit(); |
| commit->CopyFrom(commit_msg); |
| return log_->Append(&commit_entry); |
| } |
| |
| Status TabletBootstrap::DetermineSkippedOpsAndBuildResponse(const TxResultPB& orig_result, |
| TxResultPB* new_result, |
| WriteResponsePB* response, |
| bool* all_skipped) { |
| int num_ops = orig_result.ops_size(); |
| new_result->mutable_ops()->Reserve(num_ops); |
| *all_skipped = true; |
| |
| for (int i = 0; i < num_ops; i++) { |
| const auto& orig_op_result = orig_result.ops(i); |
| OpAction action; |
| RETURN_NOT_OK(FilterOperation(orig_op_result, &action)); |
| *all_skipped &= action != NEEDS_REPLAY; |
| |
| if (action != NEEDS_REPLAY) { |
| new_result->mutable_ops(i)->set_skip_on_replay(true); |
| } |
| |
| if (action == SKIP_PREVIOUSLY_FAILED) { |
| if (response) { |
| WriteResponsePB::PerRowErrorPB* error = response->add_per_row_errors(); |
| error->set_row_index(i); |
| error->mutable_error()->CopyFrom(orig_op_result.failed_status()); |
| } |
| // If the op is already flushed we won't be applying it. |
| DCHECK(orig_op_result.has_failed_status()); |
| new_result->mutable_ops(i)->mutable_failed_status()->CopyFrom(orig_op_result.failed_status()); |
| } |
| } |
| |
| if (*all_skipped) { |
| stats_.ops_ignored++; |
| } |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::PlayWriteRequest(const IOContext* io_context, |
| ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg) { |
| |
| // Set up the new op. |
| // Even if we're going to ignore the op, it's important to do this so that |
| // MVCC advances. |
| DCHECK(replicate_msg->has_timestamp()); |
| WriteRequestPB* write = replicate_msg->mutable_write_request(); |
| |
| WriteOpState op_state(nullptr, write, nullptr); |
| op_state.mutable_op_id()->CopyFrom(replicate_msg->id()); |
| op_state.set_timestamp(Timestamp(replicate_msg->timestamp())); |
| |
| // Prepare the commit entry for the rewritten log. |
| LogEntryPB& commit_entry = *google::protobuf::Arena::CreateMessage<LogEntryPB>( |
| op_state.pb_arena()); |
| commit_entry.set_type(log::COMMIT); |
| CommitMsg* new_commit = commit_entry.mutable_commit(); |
| new_commit->CopyFrom(commit_msg); |
| |
| tablet_->StartOp(&op_state); |
| tablet_->StartApplying(&op_state); |
| |
| unique_ptr<WriteResponsePB> response; |
| |
| bool tracking_results = result_tracker_.get() != nullptr && replicate_msg->has_request_id(); |
| |
| // If the results are being tracked and this write has a request id, register |
| // it with the result tracker. |
| ResultTracker::RpcState state = ResultTracker::RpcState::NEW; |
| if (tracking_results) { |
| VLOG(1) << result_tracker_.get() << " Boostrapping request for tablet: " |
| << write->tablet_id() << ". State: " << 0 << " id: " |
| << SecureDebugString(replicate_msg->request_id()); |
| // We only replay committed requests so the result of tracking this request can be: |
| // NEW: |
| // This is a previously untracked request, or we changed the driver -> store the result |
| // COMPLETED or STALE: |
| // We've bootstrapped this tablet twice, and previously stored the result -> do |
| // nothing. |
| state = result_tracker_->TrackRpcOrChangeDriver(replicate_msg->request_id()); |
| CHECK(state == ResultTracker::RpcState::NEW || |
| state == ResultTracker::RpcState::COMPLETED || |
| state == ResultTracker::RpcState::STALE) |
| << "Wrong state: " << state; |
| response.reset(new WriteResponsePB()); |
| response->set_timestamp(replicate_msg->timestamp()); |
| } |
| |
| // Determine which of the operations are already flushed to persistent |
| // storage and don't need to be re-applied. We can do this even before |
| // we decode any row operations, so we can short-circuit that decoding |
| // in the case that the entire op has been already flushed. |
| TxResultPB* new_result = new_commit->mutable_result(); |
| bool all_flushed; |
| RETURN_NOT_OK(DetermineSkippedOpsAndBuildResponse(commit_msg.result(), |
| new_result, |
| response.get(), |
| &all_flushed)); |
| |
| if (tracking_results && state == ResultTracker::NEW) { |
| result_tracker_->RecordCompletionAndRespond(replicate_msg->request_id(), response.get()); |
| } |
| |
| Status play_status; |
| if (!all_flushed && write->has_row_operations()) { |
| // Rather than RETURN_NOT_OK() here, we need to just save the status and do the |
| // RETURN_NOT_OK() down below the FinishApplying() call below. Even though it seems wrong |
| // to commit the op when in fact it failed to apply, we would throw a CHECK |
| // failure if we attempted to 'Abort()' after entering the applying stage. Allowing it to |
| // Commit isn't problematic because we don't expose the results anyway, and the bad |
| // Status returned below will cause us to fail the entire tablet bootstrap anyway. |
| play_status = PlayRowOperations(io_context, &op_state, commit_msg.result(), new_result); |
| |
| if (play_status.ok()) { |
| // Replace the original commit message's result with the new one from the replayed operation. |
| op_state.ReleaseTxResultPB(new_commit->mutable_result()); |
| } |
| } |
| |
| op_state.FinishApplyingOrAbort(Op::APPLIED); |
| |
| // If we failed to apply the operations, fail bootstrap before we write anything incorrect |
| // to the recovery log. |
| RETURN_NOT_OK(play_status); |
| |
| RETURN_NOT_OK(log_->Append(&commit_entry)); |
| |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/, |
| ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg) { |
| // There are three potential outcomes to expect with this replay: |
| // 1. There is no 'result' in the commit message. The alter succeeds, and the |
| // log updates its schema. |
| // 2. There is no 'result' in the commit message. The alter fails, and the |
| // log doesn't update its schema. This can happen if trying to replay an |
| // invalid alter schema request from before we started putting the results |
| // in the commit message. Note that we'll leave the commit message as is; |
| // it's harmless since replaying the operation should be a no-op anyway. |
| // 3. The commit message contains a 'result', which should only happen if the |
| // alter resulted in a failure. Exit out without attempting the alter. |
| if (commit_msg.has_result()) { |
| // If we put a result in the commit message, it should be an error and we |
| // don't need to replay it. In case, in the future, we decide to put |
| // positive results in the commit messages, just filter ops that have |
| // failed statuses instead of D/CHECKing. |
| DCHECK_EQ(1, commit_msg.result().ops_size()); |
| const OperationResultPB& op = commit_msg.result().ops(0); |
| if (op.has_failed_status()) { |
| Status error = StatusFromPB(op.failed_status()); |
| VLOG(1) << "Played a failed alter request: " << error.ToString(); |
| return AppendCommitMsg(commit_msg); |
| } |
| } |
| AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request(); |
| |
| // Decode schema |
| Schema schema; |
| RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema)); |
| |
| AlterSchemaOpState op_state(nullptr, alter_schema, nullptr); |
| RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&op_state, &schema)); |
| |
| // Apply the alter schema to the tablet |
| RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&op_state), "Failed to AlterSchema:"); |
| |
| if (!op_state.error()) { |
| // If the alter completed successfully, update the log segment header. Note |
| // that our new log isn't hooked up to the tablet yet. |
| log_->SetSchemaForNextLogSegment(std::move(schema), op_state.schema_version()); |
| } |
| |
| return AppendCommitMsg(commit_msg); |
| } |
| |
| Status TabletBootstrap::PlayChangeConfigRequest(const IOContext* /*io_context*/, |
| ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg) { |
| // Invariant: The committed config change request is always locally persisted |
| // in the consensus metadata before the commit message is written to the WAL. |
| if (PREDICT_FALSE(replicate_msg->id().index() > committed_raft_config_.opid_index())) { |
| string msg = Substitute("Committed config change op in WAL has opid index ($0) greater than " |
| "config persisted in the consensus metadata ($1). " |
| "Replicate message: {$2}. " |
| "Committed raft config in consensus metadata: {$3}", |
| replicate_msg->id().index(), |
| committed_raft_config_.opid_index(), |
| SecureShortDebugString(*replicate_msg), |
| SecureShortDebugString(committed_raft_config_)); |
| LOG_WITH_PREFIX(DFATAL) << msg; |
| return Status::Corruption(msg); |
| } |
| return AppendCommitMsg(commit_msg); |
| } |
| |
| Status TabletBootstrap::PlayTxnParticipantOpRequest(const IOContext* /*io_context*/, |
| ReplicateMsg* replicate_msg, |
| const CommitMsg& commit_msg) { |
| ParticipantOpState op_state(tablet_replica_.get(), |
| tablet_->txn_participant(), |
| &replicate_msg->participant_request()); |
| op_state.AcquireTxnAndLock(); |
| SCOPED_CLEANUP({ |
| op_state.ReleaseTxn(); |
| }); |
| // NOTE: don't bother validating the current state of the op. Presumably that |
| // happened the first time this op was written. |
| RETURN_NOT_OK(op_state.PerformOp(replicate_msg->id())); |
| return AppendCommitMsg(commit_msg); |
| } |
| |
| Status TabletBootstrap::PlayNoOpRequest(const IOContext* /*io_context*/, |
| ReplicateMsg* /*replicate_msg*/, |
| const CommitMsg& commit_msg) { |
| return AppendCommitMsg(commit_msg); |
| } |
| |
| Status TabletBootstrap::PlayRowOperations(const IOContext* io_context, |
| WriteOpState* op_state, |
| const TxResultPB& orig_result, |
| TxResultPB* new_result) { |
| Schema inserts_schema; |
| RETURN_NOT_OK_PREPEND(SchemaFromPB(op_state->request()->schema(), &inserts_schema), |
| "Couldn't decode client schema"); |
| |
| RETURN_NOT_OK_PREPEND(tablet_->DecodeWriteOperations(&inserts_schema, op_state), |
| Substitute("Could not decode row operations: $0", |
| SecureDebugString(op_state->request()->row_operations()))); |
| |
| // Run AcquireRowLocks, Apply, etc! |
| RETURN_NOT_OK_PREPEND(tablet_->AcquireRowLocks(op_state), |
| "Failed to acquire row locks"); |
| |
| RETURN_NOT_OK(ApplyOperations(io_context, op_state, orig_result, new_result)); |
| |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::ApplyOperations(const IOContext* io_context, |
| WriteOpState* op_state, |
| const TxResultPB& orig_result, |
| TxResultPB* new_result) { |
| DCHECK_EQ(op_state->row_ops().size(), orig_result.ops_size()); |
| DCHECK_EQ(op_state->row_ops().size(), new_result->ops_size()); |
| int32_t op_idx = 0; |
| |
| for (RowOp* op : op_state->row_ops()) { |
| int32_t curr_op_idx = op_idx++; |
| // Increment the seen/ignored stats. |
| switch (op->decoded_op.type) { |
| case RowOperationsPB::INSERT: |
| case RowOperationsPB::INSERT_IGNORE: |
| case RowOperationsPB::UPSERT: { |
| // TODO(unknown): should we have a separate counter for upserts? |
| stats_.inserts_seen++; |
| if (op->has_result()) { |
| stats_.inserts_ignored++; |
| } |
| break; |
| } |
| case RowOperationsPB::UPDATE: |
| case RowOperationsPB::DELETE: { |
| stats_.mutations_seen++; |
| if (op->has_result()) { |
| stats_.mutations_ignored++; |
| } |
| break; |
| } |
| default: |
| LOG_WITH_PREFIX(FATAL) << "Bad op type: " << op->decoded_op.type; |
| break; |
| } |
| |
| const OperationResultPB& new_op_result = new_result->ops(curr_op_idx); |
| // If the op is already flushed or had previously failed, no need to replay it. |
| // TODO(dralves) this back and forth is weird. We're first setting the flushed/failed |
| // status on the rewritten message's commit entry. Then we pass it here to |
| // set the status on the op, then we set it back on the commit entry with |
| // ReleaseTxResultPB(). This could be simplified if we build the RowOps on |
| // demand and just created DecodedRowOperation/RowOp for the replayed stuff. |
| if (new_op_result.skip_on_replay()) { |
| op->SetSkippedResult(new_op_result); |
| continue; |
| } |
| |
| op->set_original_result_from_log(&orig_result.ops(curr_op_idx)); |
| |
| // Actually apply it. |
| ProbeStats stats; // we don't use this, but tablet internals require non-NULL. |
| RETURN_NOT_OK(tablet_->ApplyRowOperation(io_context, op_state, op, &stats)); |
| DCHECK(op->has_result()); |
| |
| // We expect that the above Apply() will always succeed, because we're |
| // applying an operation that we know succeeded before the server |
| // restarted. If it doesn't succeed, something is wrong and we are |
| // diverging from our prior state, so bail. |
| if (op->result->has_failed_status()) { |
| return Status::Corruption("Operation which previously succeeded failed " |
| "during log replay", |
| Substitute("Op: $0\nFailure: $1", |
| op->ToString(*tablet_->schema()), |
| SecureShortDebugString(op->result->failed_status()))); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::FilterOperation(const OperationResultPB& op_result, |
| OpAction* action) { |
| |
| // If the operation failed or was skipped, originally, no need to re-apply it. |
| if (op_result.has_failed_status()) { |
| *action = SKIP_PREVIOUSLY_FAILED; |
| return Status::OK(); |
| } |
| |
| if (op_result.skip_on_replay()) { |
| *action = SKIP_PREVIOUSLY_FLUSHED; |
| return Status::OK(); |
| } |
| |
| int num_mutated_stores = op_result.mutated_stores_size(); |
| if (PREDICT_FALSE(num_mutated_stores > 2)) { |
| return Status::Corruption(Substitute("All operations must have at most two mutated_stores: $0", |
| SecureShortDebugString(op_result))); |
| } |
| // NOTE: it's possible that num_mutated_stores = 0 in the case of an |
| // UPSERT which only specified the primary key. In that case, if the |
| // row already existed, it gets dropped without converting into an UPDATE. |
| |
| // The mutation may have been duplicated, so we'll check whether any of the |
| // output targets was active. |
| int num_active_stores = 0; |
| for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) { |
| if (flushed_stores_.IsMemStoreActive(mutated_store)) { |
| num_active_stores++; |
| } |
| } |
| |
| if (num_active_stores == 0) { |
| // The mutation was fully flushed. |
| *action = SKIP_PREVIOUSLY_FLUSHED; |
| return Status::OK(); |
| } |
| |
| if (PREDICT_FALSE(num_active_stores == 2)) { |
| // It's not possible for a duplicated mutation to refer to two stores which are still |
| // active. Either the mutation arrived before the metadata was flushed, in which case |
| // the 'first' store is live, or it arrived just after it was flushed, in which case |
| // the 'second' store was live. But at no time should the metadata refer to both the |
| // 'input' and 'output' stores of a compaction. |
| return Status::Corruption("Mutation was duplicated to two stores that are considered live", |
| SecureShortDebugString(op_result)); |
| } |
| |
| *action = NEEDS_REPLAY; |
| return Status::OK(); |
| } |
| |
| Status TabletBootstrap::UpdateClock(uint64_t timestamp) { |
| return clock_->Update(Timestamp(timestamp)); |
| } |
| |
| string TabletBootstrap::LogPrefix() const { |
| return Substitute("T $0 P $1: ", tablet_meta_->tablet_id(), tablet_meta_->fs_manager()->uuid()); |
| } |
| |
| Status FlushedStoresSnapshot::InitFrom(const TabletMetadata& tablet_meta) { |
| CHECK(flushed_dms_by_drs_id_.empty()) << "already initted"; |
| last_durable_mrs_id_ = tablet_meta.last_durable_mrs_id(); |
| for (const shared_ptr<RowSetMetadata>& rsmd : tablet_meta.rowsets()) { |
| if (!InsertIfNotPresent(&flushed_dms_by_drs_id_, rsmd->id(), |
| rsmd->last_durable_redo_dms_id())) { |
| return Status::Corruption(Substitute( |
| "Duplicate DRS ID $0 in tablet metadata. " |
| "Found DRS $0 with last durable redo DMS ID $1 while trying to " |
| "initialize DRS $0 with last durable redo DMS ID $2", |
| rsmd->id(), |
| flushed_dms_by_drs_id_[rsmd->id()], |
| rsmd->last_durable_redo_dms_id())); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| bool FlushedStoresSnapshot::IsMemStoreActive(const MemStoreTargetPB& target) const { |
| if (target.has_mrs_id()) { |
| DCHECK(!target.has_rs_id()); |
| DCHECK(!target.has_dms_id()); |
| |
| // The original mutation went to the MRS. If this MRS has not yet been made |
| // durable, it needs to be replayed. |
| return target.mrs_id() > last_durable_mrs_id_; |
| } else { |
| |
| // The original mutation went to a DRS's delta store. |
| DCHECK(target.has_rs_id()); |
| |
| int64_t last_durable_dms_id; |
| if (!FindCopy(flushed_dms_by_drs_id_, target.rs_id(), &last_durable_dms_id)) { |
| // If we have no data about this DRS, then there are two cases: |
| // |
| // 1) The DRS has already been flushed, but then later got removed because |
| // it got compacted away or culled because it was empty. In the former |
| // case, the deltas should have been reflected in the new compaction |
| // output, and in the latter, all the rows in the rowset have been |
| // deleted and there's nothing to replay. |
| // |
| // 2) The DRS was in the process of being written, but haven't yet |
| // flushed the TabletMetadata update that includes it. We only write |
| // to an in-progress DRS like this when we are in the 'duplicating' |
| // phase of a compaction. In that case, the other duplicated 'target' |
| // should still be present in the metadata, and we can base our |
| // decision based on that one. |
| return false; |
| } |
| |
| // If the original rowset that we applied the edit to exists, check whether |
| // the edit was in a flushed DMS or a live one. |
| return target.dms_id() > last_durable_dms_id; |
| } |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |