| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tablet/tablet_bootstrap.h" |
| |
| #include <cstdint> |
| #include <limits> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/optional/optional.hpp> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/clock/clock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/row_operations.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus-test-util.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/consensus_meta.h" |
| #include "kudu/consensus/consensus_meta_manager.h" |
| #include "kudu/consensus/log-test-base.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/log_anchor_registry.h" |
| #include "kudu/consensus/log_reader.h" |
| #include "kudu/consensus/log_util.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/consensus/ref_counted_replicate.h" |
| #include "kudu/fs/data_dirs.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/rpc/result_tracker.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/rowset.h" |
| #include "kudu/tablet/rowset_metadata.h" |
| #include "kudu/tablet/tablet-harness.h" |
| #include "kudu/tablet/tablet-test-util.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet.pb.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/logging_test_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| |
| using kudu::consensus::ConsensusBootstrapInfo; |
| using kudu::consensus::ConsensusMetadata; |
| using kudu::consensus::ConsensusMetadataManager; |
| using kudu::consensus::MakeOpId; |
| using kudu::consensus::OpId; |
| using kudu::consensus::ReplicateMsg; |
| using kudu::consensus::ReplicateRefPtr; |
| using kudu::consensus::kMinimumTerm; |
| using kudu::consensus::make_scoped_refptr_replicate; |
| using kudu::log::LogAnchorRegistry; |
| using kudu::log::LogTestBase; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::tserver::WriteRequestPB; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| |
| namespace kudu { |
| namespace tablet { |
| |
| class BootstrapTest : public LogTestBase { |
| protected: |
| |
| void SetUp() OVERRIDE { |
| LogTestBase::SetUp(); |
| cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager_.get())); |
| } |
| |
| Status LoadTestTabletMetadata(int mrs_id, int delta_id, scoped_refptr<TabletMetadata>* meta) { |
| Schema schema = SchemaBuilder(schema_).Build(); |
| std::pair<PartitionSchema, Partition> partition = CreateDefaultPartition(schema); |
| |
| RETURN_NOT_OK(TabletMetadata::LoadOrCreate(fs_manager_.get(), |
| log::kTestTablet, |
| log::kTestTable, |
| log::kTestTableId, |
| schema, |
| partition.first, |
| partition.second, |
| TABLET_DATA_READY, |
| /*tombstone_last_logged_opid=*/ boost::none, |
| /*extra_config=*/ boost::none, |
| /*dimension_label=*/ boost::none, |
| /*table_type=*/ boost::none, |
| meta)); |
| (*meta)->SetLastDurableMrsIdForTests(mrs_id); |
| if ((*meta)->GetRowSetForTests(0) != nullptr) { |
| (*meta)->GetRowSetForTests(0)->SetLastDurableRedoDmsIdForTests(delta_id); |
| } |
| return (*meta)->Flush(); |
| } |
| |
| Status CreateConsensusMetadata(const scoped_refptr<TabletMetadata>& meta) { |
| consensus::RaftConfigPB config; |
| config.set_opid_index(consensus::kInvalidOpIdIndex); |
| consensus::RaftPeerPB* peer = config.add_peers(); |
| peer->set_permanent_uuid(meta->fs_manager()->uuid()); |
| peer->set_member_type(consensus::RaftPeerPB::VOTER); |
| |
| RETURN_NOT_OK_PREPEND(cmeta_manager_->Create(meta->tablet_id(), config, kMinimumTerm), |
| "Unable to create consensus metadata"); |
| |
| return Status::OK(); |
| } |
| |
| Status PersistTestTabletMetadataState(TabletDataState state) { |
| scoped_refptr<TabletMetadata> meta; |
| RETURN_NOT_OK(LoadTestTabletMetadata(-1, -1, &meta)); |
| meta->set_tablet_data_state(state); |
| RETURN_NOT_OK(meta->Flush()); |
| return Status::OK(); |
| } |
| |
| Status RunBootstrapOnTestTablet(const scoped_refptr<TabletMetadata>& meta, |
| shared_ptr<Tablet>* tablet, |
| ConsensusBootstrapInfo* boot_info) { |
| |
| scoped_refptr<ConsensusMetadata> cmeta; |
| RETURN_NOT_OK(cmeta_manager_->Load(meta->tablet_id(), &cmeta)); |
| |
| // Close the existing log to evict any segments from the file cache so that |
| // bootstrap won't access any stale (cached) segments. |
| RETURN_NOT_OK(log_->Close()); |
| |
| scoped_refptr<LogAnchorRegistry> log_anchor_registry(new LogAnchorRegistry()); |
| // Now attempt to recover the log |
| RETURN_NOT_OK(BootstrapTablet( |
| meta, |
| cmeta->CommittedConfig(), |
| clock_.get(), |
| /*mem_tracker*/nullptr, |
| /*result_tracker*/nullptr, |
| metric_registry_.get(), |
| file_cache_.get(), |
| /*tablet_replica*/nullptr, |
| std::move(log_anchor_registry), |
| tablet, |
| &log_, |
| boot_info)); |
| |
| return Status::OK(); |
| } |
| |
| Status BootstrapTestTablet(int mrs_id, |
| int delta_id, |
| shared_ptr<Tablet>* tablet, |
| ConsensusBootstrapInfo* boot_info) { |
| scoped_refptr<TabletMetadata> meta; |
| RETURN_NOT_OK_PREPEND(LoadTestTabletMetadata(mrs_id, delta_id, &meta), |
| "Unable to load test tablet metadata"); |
| |
| RETURN_NOT_OK(CreateConsensusMetadata(meta)); |
| |
| RETURN_NOT_OK_PREPEND(RunBootstrapOnTestTablet(meta, tablet, boot_info), |
| "Unable to bootstrap test tablet"); |
| return Status::OK(); |
| } |
| |
| void IterateTabletRows(const Tablet* tablet, |
| vector<string>* results) { |
| unique_ptr<RowwiseIterator> iter; |
| RowIteratorOptions opts; |
| opts.projection = &schema_; |
| ASSERT_OK(tablet->NewRowIterator(std::move(opts), &iter)); |
| ASSERT_OK(iter->Init(nullptr)); |
| ASSERT_OK(IterateToStringList(iter.get(), results)); |
| for (const string& result : *results) { |
| VLOG(1) << result; |
| } |
| } |
| |
| scoped_refptr<ConsensusMetadataManager> cmeta_manager_; |
| }; |
| |
| // Tests a normal bootstrap scenario |
| TEST_F(BootstrapTest, TestBootstrap) { |
| ASSERT_OK(BuildLog()); |
| |
| ASSERT_OK(AppendReplicateBatch(MakeOpId(1, current_index_))); |
| ASSERT_OK(RollLog()); |
| |
| ASSERT_OK(AppendCommit(MakeOpId(1, current_index_))); |
| |
| shared_ptr<Tablet> tablet; |
| ConsensusBootstrapInfo boot_info; |
| StringVectorSink capture_logs; |
| { |
| // Capture the log messages during bootstrap. |
| ScopedRegisterSink reg(&capture_logs); |
| ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info)); |
| } |
| |
| // Make sure we don't see anything in the logs that would make a user scared. |
| for (const string& s : capture_logs.logged_msgs()) { |
| ASSERT_STR_NOT_MATCHES(s, "[cC]orrupt"); |
| } |
| |
| vector<string> results; |
| IterateTabletRows(tablet.get(), &results); |
| ASSERT_EQ(1, results.size()); |
| } |
| |
| // Test that we don't overflow opids. Regression test for KUDU-1933. |
| TEST_F(BootstrapTest, TestBootstrapHighOpIdIndex) { |
| // Start appending with a log index 3 under the int32 max value. |
| // Append 6 log entries, which will roll us right through the int32 max. |
| const int64_t first_log_index = std::numeric_limits<int32_t>::max() - 3; |
| const int kNumEntries = 6; |
| ASSERT_OK(BuildLog()); |
| current_index_ = first_log_index; |
| for (int i = 0; i < kNumEntries; i++) { |
| AppendReplicateBatchAndCommitEntryPairsToLog(1); |
| } |
| |
| // Kick off tablet bootstrap and ensure everything worked. |
| shared_ptr<Tablet> tablet; |
| ConsensusBootstrapInfo boot_info; |
| ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info)); |
| OpId last_opid; |
| last_opid.set_term(1); |
| last_opid.set_index(current_index_ - 1); |
| ASSERT_OPID_EQ(last_opid, boot_info.last_id); |
| ASSERT_OPID_EQ(last_opid, boot_info.last_committed_id); |
| } |
| |
| // Tests attempting a local bootstrap of a tablet that was in the middle of a |
| // tablet copy before "crashing". |
| TEST_F(BootstrapTest, TestIncompleteTabletCopy) { |
| ASSERT_OK(BuildLog()); |
| |
| ASSERT_OK(PersistTestTabletMetadataState(TABLET_DATA_COPYING)); |
| shared_ptr<Tablet> tablet; |
| ConsensusBootstrapInfo boot_info; |
| fs_manager_->dd_manager()->DeleteDataDirGroup(log::kTestTablet); |
| Status s = BootstrapTestTablet(-1, -1, &tablet, &boot_info); |
| ASSERT_TRUE(s.IsCorruption()) << "Expected corruption: " << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "TabletMetadata bootstrap state is TABLET_DATA_COPYING"); |
| LOG(INFO) << "State is still TABLET_DATA_COPYING, as expected: " << s.ToString(); |
| } |
| |
| // Tests the KUDU-141 scenario: bootstrap when there is |
| // an orphaned commit after a log roll. |
| // The test simulates the following scenario: |
| // |
| // 1) 'Replicate A' is written to Segment_1, which is anchored |
| // on MemRowSet_1. |
| // 2) Segment_1 is rolled, 'Commit A' is written to Segment_2. |
| // 3) MemRowSet_1 is flushed, releasing all anchors. |
| // 4) Segment_1 is garbage collected. |
| // 5) We crash, requiring a recovery of Segment_2 which now contains |
| // the orphan 'Commit A'. |
| TEST_F(BootstrapTest, TestOrphanCommit) { |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid = MakeOpId(1, current_index_); |
| |
| // Step 1) Write a REPLICATE to the log, and roll it. |
| ASSERT_OK(AppendReplicateBatch(opid)); |
| ASSERT_OK(RollLog()); |
| |
| // Step 2) Write the corresponding COMMIT in the second segment. |
| ASSERT_OK(AppendCommit(opid)); |
| |
| scoped_refptr<TabletMetadata> meta; |
| ASSERT_OK(LoadTestTabletMetadata(/*mrs_id=*/ -1, /*delta_id=*/ -1, &meta)); |
| ASSERT_OK(CreateConsensusMetadata(meta)); |
| |
| { |
| shared_ptr<Tablet> tablet; |
| ConsensusBootstrapInfo boot_info; |
| |
| // Step 3) Apply the operations in the log to the tablet and flush |
| // the tablet to disk. |
| ASSERT_OK(RunBootstrapOnTestTablet(meta, &tablet, &boot_info)); |
| ASSERT_OK(tablet->Flush()); |
| |
| // Create a new log segment. |
| ASSERT_OK(RollLog()); |
| |
| // Step 4) Create an orphaned commit by first adding a commit to |
| // the newly rolled logfile, and then by removing the previous |
| // commits. |
| ASSERT_OK(AppendCommit(opid)); |
| log::SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| fs_manager_->env()->DeleteFile(segments[0]->path()); |
| |
| // Untrack the tablet in the data dir manager so upon the next call to |
| // BootstrapTestTablet, the tablet metadata's data dir group can be loaded. |
| fs_manager_->dd_manager()->DeleteDataDirGroup(tablet->tablet_id()); |
| } |
| { |
| shared_ptr<Tablet> tablet; |
| ConsensusBootstrapInfo boot_info; |
| |
| // Note: when GLOG_v=1, the test logs should include 'Ignoring |
| // orphan commit: op_type: WRITE_OP...' line. |
| ASSERT_OK(LoadTestTabletMetadata(/*mrs_id=*/ 2, /*delta_id=*/ 1, &meta)); |
| ASSERT_OK(RunBootstrapOnTestTablet(meta, &tablet, &boot_info)); |
| |
| // Confirm that the legitimate data (from Step 3) is still there. |
| vector<string> results; |
| IterateTabletRows(tablet.get(), &results); |
| ASSERT_EQ(1, results.size()); |
| ASSERT_EQ(R"((int32 key=1, int32 int_val=0, string string_val="this is a test insert"))", |
| results[0]); |
| ASSERT_EQ(2, tablet->metadata()->last_durable_mrs_id()); |
| } |
| } |
| |
| // Regression test for KUDU-1477: we should successfully start up |
| // even if a pending commit contains only failed operations. |
| TEST_F(BootstrapTest, TestPendingFailedCommit) { |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid_1 = MakeOpId(1, current_index_++); |
| OpId opid_2 = MakeOpId(1, current_index_++); |
| |
| // Step 2) Write the corresponding COMMIT in the second segment, |
| // with a status indicating that the writes had 'NotFound' results. |
| ASSERT_OK(AppendReplicateBatch(opid_1)); |
| ASSERT_OK(AppendReplicateBatch(opid_2)); |
| ASSERT_OK(AppendCommitWithNotFoundOpResults(opid_2)); |
| |
| { |
| shared_ptr<Tablet> tablet; |
| ConsensusBootstrapInfo boot_info; |
| |
| // Step 3) Apply the operations in the log to the tablet and flush |
| // the tablet to disk. |
| ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info)); |
| } |
| } |
| |
| // Tests this scenario: |
| // Orphan COMMIT with id <= current mrs id, followed by a REPLICATE |
| // message with mrs_id > current mrs_id, and a COMMIT message for that |
| // REPLICATE message. |
| // |
| // This should result in the orphan COMMIT being ignored, but the last |
| // REPLICATE/COMMIT messages ending up in the tablet. |
| TEST_F(BootstrapTest, TestNonOrphansAfterOrphanCommit) { |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid = MakeOpId(1, current_index_); |
| |
| ASSERT_OK(AppendReplicateBatch(opid)); |
| ASSERT_OK(RollLog()); |
| |
| ASSERT_OK(AppendCommit(opid)); |
| |
| log::SegmentSequence segments; |
| log_->reader()->GetSegmentsSnapshot(&segments); |
| fs_manager_->env()->DeleteFile(segments[0]->path()); |
| |
| current_index_ += 2; |
| |
| opid = MakeOpId(1, current_index_); |
| |
| ASSERT_OK(AppendReplicateBatch(opid)); |
| ASSERT_OK(AppendCommit(opid, 2, 1, 0)); |
| |
| shared_ptr<Tablet> tablet; |
| ConsensusBootstrapInfo boot_info; |
| ASSERT_OK(BootstrapTestTablet(1, 0, &tablet, &boot_info)); |
| |
| // Confirm that the legitimate data is there. |
| vector<string> results; |
| IterateTabletRows(tablet.get(), &results); |
| ASSERT_EQ(1, results.size()); |
| |
| // 'key=3' means the REPLICATE message was inserted when current_id_ was 3, meaning |
| // that only the non-orphan commit went in. |
| ASSERT_EQ(R"((int32 key=3, int32 int_val=0, string string_val="this is a test insert"))", |
| results[0]); |
| } |
| |
| // Test for where the server crashes in between REPLICATE and COMMIT. |
| // Bootstrap should not replay the operation, but should return it in |
| // the ConsensusBootstrapInfo |
| TEST_F(BootstrapTest, TestOrphanedReplicate) { |
| ASSERT_OK(BuildLog()); |
| |
| // Append a REPLICATE with no commit |
| int replicate_index = current_index_++; |
| |
| OpId opid = MakeOpId(1, replicate_index); |
| |
| ASSERT_OK(AppendReplicateBatch(opid)); |
| |
| // Bootstrap the tablet. It shouldn't replay anything. |
| ConsensusBootstrapInfo boot_info; |
| shared_ptr<Tablet> tablet; |
| ASSERT_OK(BootstrapTestTablet(0, 0, &tablet, &boot_info)); |
| |
| // Table should be empty because we didn't replay the REPLICATE |
| vector<string> results; |
| IterateTabletRows(tablet.get(), &results); |
| ASSERT_EQ(0, results.size()); |
| |
| // The consensus bootstrap info should include the orphaned REPLICATE. |
| ASSERT_EQ(1, boot_info.orphaned_replicates.size()); |
| ASSERT_STR_CONTAINS(SecureShortDebugString(*boot_info.orphaned_replicates[0]), |
| "this is a test mutate"); |
| |
| // And it should also include the latest opids. |
| EXPECT_EQ("term: 1 index: 1", SecureShortDebugString(boot_info.last_id)); |
| } |
| |
| // Bootstrap should fail if no ConsensusMetadata file exists. |
| TEST_F(BootstrapTest, TestMissingConsensusMetadata) { |
| ASSERT_OK(BuildLog()); |
| |
| scoped_refptr<TabletMetadata> meta; |
| ASSERT_OK(LoadTestTabletMetadata(-1, -1, &meta)); |
| |
| shared_ptr<Tablet> tablet; |
| ConsensusBootstrapInfo boot_info; |
| Status s = RunBootstrapOnTestTablet(meta, &tablet, &boot_info); |
| |
| ASSERT_TRUE(s.IsNotFound()); |
| ASSERT_STR_CONTAINS(s.ToString(), "Unable to load consensus metadata"); |
| } |
| |
| TEST_F(BootstrapTest, TestOperationOverwriting) { |
| ASSERT_OK(BuildLog()); |
| |
| OpId opid = MakeOpId(1, 1); |
| |
| // Append a replicate in term 1 |
| ASSERT_OK(AppendReplicateBatch(opid)); |
| |
| // Append a commit for op 1.1 |
| ASSERT_OK(AppendCommit(opid)); |
| |
| // Now append replicates for 4.2 and 4.3 |
| ASSERT_OK(AppendReplicateBatch(MakeOpId(4, 2))); |
| ASSERT_OK(AppendReplicateBatch(MakeOpId(4, 3))); |
| |
| ASSERT_OK(RollLog()); |
| // And overwrite with 3.2 |
| ASSERT_OK(AppendReplicateBatch(MakeOpId(3, 2))); |
| |
| // When bootstrapping we should apply ops 1.1 and get 3.2 as pending. |
| ConsensusBootstrapInfo boot_info; |
| shared_ptr<Tablet> tablet; |
| ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info)); |
| |
| ASSERT_EQ(boot_info.orphaned_replicates.size(), 1); |
| ASSERT_OPID_EQ(boot_info.orphaned_replicates[0]->id(), MakeOpId(3, 2)); |
| |
| // Confirm that the legitimate data is there. |
| vector<string> results; |
| IterateTabletRows(tablet.get(), &results); |
| ASSERT_EQ(1, results.size()); |
| |
| ASSERT_EQ(R"((int32 key=1, int32 int_val=0, string string_val="this is a test insert"))", |
| results[0]); |
| } |
| |
| // Tests that when we have out-of-order commits that touch the same rows, operations are |
| // still applied and in the correct order. |
| TEST_F(BootstrapTest, TestOutOfOrderCommits) { |
| ASSERT_OK(BuildLog()); |
| |
| consensus::ReplicateRefPtr replicate = consensus::make_scoped_refptr_replicate( |
| new consensus::ReplicateMsg()); |
| replicate->get()->set_op_type(consensus::WRITE_OP); |
| tserver::WriteRequestPB* batch_request = replicate->get()->mutable_write_request(); |
| ASSERT_OK(SchemaToPB(schema_, batch_request->mutable_schema())); |
| batch_request->set_tablet_id(log::kTestTablet); |
| |
| // This appends Insert(1) with op 10.10 |
| OpId insert_opid = MakeOpId(10, 10); |
| replicate->get()->mutable_id()->CopyFrom(insert_opid); |
| replicate->get()->set_timestamp(clock_->Now().ToUint64()); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 10, 1, |
| "this is a test insert", batch_request->mutable_row_operations()); |
| ASSERT_OK(AppendReplicateBatch(replicate)); |
| |
| // This appends Mutate(1) with op 10.11 |
| OpId mutate_opid = MakeOpId(10, 11); |
| batch_request->mutable_row_operations()->Clear(); |
| replicate->get()->mutable_id()->CopyFrom(mutate_opid); |
| replicate->get()->set_timestamp(clock_->Now().ToUint64()); |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, |
| 10, 2, "this is a test mutate", |
| batch_request->mutable_row_operations()); |
| ASSERT_OK(AppendReplicateBatch(replicate)); |
| |
| // Now commit the mutate before the insert (in the log). |
| consensus::CommitMsg mutate_commit; |
| mutate_commit.set_op_type(consensus::WRITE_OP); |
| mutate_commit.mutable_commited_op_id()->CopyFrom(mutate_opid); |
| TxResultPB* result = mutate_commit.mutable_result(); |
| OperationResultPB* mutate = result->add_ops(); |
| MemStoreTargetPB* target = mutate->add_mutated_stores(); |
| target->set_mrs_id(1); |
| |
| ASSERT_OK(AppendCommit(mutate_commit)); |
| |
| consensus::CommitMsg insert_commit; |
| insert_commit.set_op_type(consensus::WRITE_OP); |
| insert_commit.mutable_commited_op_id()->CopyFrom(insert_opid); |
| result = insert_commit.mutable_result(); |
| OperationResultPB* insert = result->add_ops(); |
| target = insert->add_mutated_stores(); |
| target->set_mrs_id(1); |
| |
| ASSERT_OK(AppendCommit(insert_commit)); |
| |
| ConsensusBootstrapInfo boot_info; |
| shared_ptr<Tablet> tablet; |
| ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info)); |
| |
| // Confirm that both operations were applied. |
| vector<string> results; |
| IterateTabletRows(tablet.get(), &results); |
| ASSERT_EQ(1, results.size()); |
| |
| ASSERT_EQ(R"((int32 key=10, int32 int_val=2, string string_val="this is a test mutate"))", |
| results[0]); |
| } |
| |
| // Tests that when we have two consecutive replicates but the commit message for the |
| // first one is missing, both appear as pending in ConsensusInfo. |
| TEST_F(BootstrapTest, TestMissingCommitMessage) { |
| ASSERT_OK(BuildLog()); |
| |
| consensus::ReplicateRefPtr replicate = consensus::make_scoped_refptr_replicate( |
| new consensus::ReplicateMsg()); |
| replicate->get()->set_op_type(consensus::WRITE_OP); |
| tserver::WriteRequestPB* batch_request = replicate->get()->mutable_write_request(); |
| ASSERT_OK(SchemaToPB(schema_, batch_request->mutable_schema())); |
| batch_request->set_tablet_id(log::kTestTablet); |
| |
| // This appends Insert(1) with op 10.10 |
| OpId insert_opid = MakeOpId(10, 10); |
| replicate->get()->mutable_id()->CopyFrom(insert_opid); |
| replicate->get()->set_timestamp(clock_->Now().ToUint64()); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 10, 1, |
| "this is a test insert", batch_request->mutable_row_operations()); |
| ASSERT_OK(AppendReplicateBatch(replicate)); |
| |
| // This appends Mutate(1) with op 10.11 |
| OpId mutate_opid = MakeOpId(10, 11); |
| batch_request->mutable_row_operations()->Clear(); |
| replicate->get()->mutable_id()->CopyFrom(mutate_opid); |
| replicate->get()->set_timestamp(clock_->Now().ToUint64()); |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, |
| 10, 2, "this is a test mutate", |
| batch_request->mutable_row_operations()); |
| ASSERT_OK(AppendReplicateBatch(replicate)); |
| |
| // Now commit the mutate before the insert (in the log). |
| consensus::CommitMsg mutate_commit; |
| mutate_commit.set_op_type(consensus::WRITE_OP); |
| mutate_commit.mutable_commited_op_id()->CopyFrom(mutate_opid); |
| TxResultPB* result = mutate_commit.mutable_result(); |
| OperationResultPB* mutate = result->add_ops(); |
| MemStoreTargetPB* target = mutate->add_mutated_stores(); |
| target->set_mrs_id(1); |
| |
| ASSERT_OK(AppendCommit(mutate_commit)); |
| |
| ConsensusBootstrapInfo boot_info; |
| shared_ptr<Tablet> tablet; |
| ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info)); |
| ASSERT_EQ(boot_info.orphaned_replicates.size(), 2); |
| ASSERT_OPID_EQ(boot_info.last_committed_id, mutate_opid); |
| |
| // Confirm that no operation was applied. |
| vector<string> results; |
| IterateTabletRows(tablet.get(), &results); |
| ASSERT_EQ(0, results.size()); |
| } |
| |
| // Test that we do not crash when a consensus-only operation has a timestamp |
| // that is higher than a timestamp assigned to a write operation that follows |
| // it in the log. |
| TEST_F(BootstrapTest, TestConsensusOnlyOperationOutOfOrderTimestamp) { |
| ASSERT_OK(BuildLog()); |
| |
| // Append NO_OP. |
| ReplicateRefPtr noop_replicate = make_scoped_refptr_replicate(new ReplicateMsg()); |
| noop_replicate->get()->set_op_type(consensus::NO_OP); |
| *noop_replicate->get()->mutable_id() = MakeOpId(1, 1); |
| noop_replicate->get()->set_timestamp(2); |
| noop_replicate->get()->mutable_noop_request()->set_timestamp_in_opid_order(false); |
| |
| ASSERT_OK(AppendReplicateBatch(noop_replicate)); |
| |
| // Append WRITE_OP with higher OpId and lower timestamp. |
| ReplicateRefPtr write_replicate = make_scoped_refptr_replicate(new ReplicateMsg()); |
| write_replicate->get()->set_op_type(consensus::WRITE_OP); |
| WriteRequestPB* batch_request = write_replicate->get()->mutable_write_request(); |
| ASSERT_OK(SchemaToPB(schema_, batch_request->mutable_schema())); |
| batch_request->set_tablet_id(log::kTestTablet); |
| *write_replicate->get()->mutable_id() = MakeOpId(1, 2); |
| write_replicate->get()->set_timestamp(1); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 1, 1, "foo", |
| batch_request->mutable_row_operations()); |
| |
| ASSERT_OK(AppendReplicateBatch(write_replicate)); |
| |
| // Now commit in OpId order. |
| // NO_OP... |
| consensus::CommitMsg mutate_commit; |
| mutate_commit.set_op_type(consensus::NO_OP); |
| *mutate_commit.mutable_commited_op_id() = noop_replicate->get()->id(); |
| |
| ASSERT_OK(AppendCommit(mutate_commit)); |
| |
| // ...and WRITE_OP... |
| mutate_commit.Clear(); |
| mutate_commit.set_op_type(consensus::WRITE_OP); |
| *mutate_commit.mutable_commited_op_id() = write_replicate->get()->id(); |
| TxResultPB* result = mutate_commit.mutable_result(); |
| OperationResultPB* mutate = result->add_ops(); |
| MemStoreTargetPB* target = mutate->add_mutated_stores(); |
| target->set_mrs_id(1); |
| |
| ASSERT_OK(AppendCommit(mutate_commit)); |
| |
| ConsensusBootstrapInfo boot_info; |
| shared_ptr<Tablet> tablet; |
| ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info)); |
| ASSERT_EQ(boot_info.orphaned_replicates.size(), 0); |
| ASSERT_OPID_EQ(boot_info.last_committed_id, write_replicate->get()->id()); |
| |
| // Confirm that the insert op was applied. |
| vector<string> results; |
| IterateTabletRows(tablet.get(), &results); |
| ASSERT_EQ(1, results.size()); |
| } |
| |
| // Regression test for KUDU-2509. There was a use-after-free bug that sometimes |
| // lead to SIGSEGV while replaying the WAL. This scenario would crash or |
| // at least UB sanitizer would report a warning if such condition exists. |
| TEST_F(BootstrapTest, TestKudu2509) { |
| ASSERT_OK(BuildLog()); |
| |
| consensus::ReplicateRefPtr replicate = consensus::make_scoped_refptr_replicate( |
| new consensus::ReplicateMsg()); |
| replicate->get()->set_op_type(consensus::WRITE_OP); |
| tserver::WriteRequestPB* batch_request = replicate->get()->mutable_write_request(); |
| ASSERT_OK(SchemaToPB(schema_, batch_request->mutable_schema())); |
| batch_request->set_tablet_id(log::kTestTablet); |
| |
| // This appends Insert(1) with op 10.10 |
| const OpId insert_opid = MakeOpId(10, 10); |
| replicate->get()->mutable_id()->CopyFrom(insert_opid); |
| replicate->get()->set_timestamp(clock_->Now().ToUint64()); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 10, 1, |
| "this is a test insert", batch_request->mutable_row_operations()); |
| ASSERT_OK(AppendReplicateBatch(replicate)); |
| |
| // This appends Mutate(1) with op 10.11. The operation would try to update |
| // a row having an extra column. This should fail since there hasn't been |
| // corresponding DDL operation committed yet. |
| const OpId mutate_opid = MakeOpId(10, 11); |
| batch_request->mutable_row_operations()->Clear(); |
| replicate->get()->mutable_id()->CopyFrom(mutate_opid); |
| replicate->get()->set_timestamp(clock_->Now().ToUint64()); |
| { |
| // Modify the existing schema to add an extra row. |
| SchemaBuilder builder(schema_); |
| ASSERT_OK(builder.AddNullableColumn("string_val_extra", STRING)); |
| const auto schema = builder.BuildWithoutIds(); |
| ASSERT_OK(SchemaToPB(schema, batch_request->mutable_schema())); |
| |
| KuduPartialRow row(&schema); |
| ASSERT_OK(row.SetInt32("key", 100)); |
| ASSERT_OK(row.SetInt32("int_val", 200)); |
| ASSERT_OK(row.SetStringCopy("string_val", "300")); |
| ASSERT_OK(row.SetStringCopy("string_val_extra", "100500")); |
| RowOperationsPBEncoder enc(batch_request->mutable_row_operations()); |
| enc.Add(RowOperationsPB::UPDATE, row); |
| } |
| ASSERT_OK(AppendReplicateBatch(replicate)); |
| |
| // Now commit the mutate before the insert (in the log). |
| consensus::CommitMsg mutate_commit; |
| mutate_commit.set_op_type(consensus::WRITE_OP); |
| mutate_commit.mutable_commited_op_id()->CopyFrom(mutate_opid); |
| mutate_commit.mutable_result()->add_ops()->add_mutated_stores()->set_mrs_id(1); |
| ASSERT_OK(AppendCommit(mutate_commit)); |
| |
| consensus::CommitMsg insert_commit; |
| insert_commit.set_op_type(consensus::WRITE_OP); |
| insert_commit.mutable_commited_op_id()->CopyFrom(insert_opid); |
| insert_commit.mutable_result()->add_ops()->add_mutated_stores()->set_mrs_id(1); |
| ASSERT_OK(AppendCommit(insert_commit)); |
| |
| ConsensusBootstrapInfo boot_info; |
| shared_ptr<Tablet> tablet; |
| const auto s = BootstrapTestTablet(-1, -1, &tablet, &boot_info); |
| const auto& status_msg = s.ToString(); |
| ASSERT_TRUE(s.IsInvalidArgument()) << status_msg; |
| ASSERT_STR_CONTAINS(status_msg, |
| "Unable to bootstrap test tablet: Failed log replay."); |
| ASSERT_STR_CONTAINS(status_msg, |
| "column string_val_extra STRING NULLABLE not present in tablet"); |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |