| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tablet/txn_participant.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <initializer_list> |
| #include <map> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/clock/clock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/row_operations.h" |
| #include "kudu/common/row_operations.pb.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/log_anchor_registry.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/ops/op.h" |
| #include "kudu/tablet/ops/op_driver.h" |
| #include "kudu/tablet/ops/op_tracker.h" |
| #include "kudu/tablet/ops/participant_op.h" |
| #include "kudu/tablet/tablet-test-util.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_metrics.h" |
| #include "kudu/tablet/tablet_replica-test-base.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tablet/txn_metadata.h" |
| #include "kudu/tablet/txn_participant-test-util.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using kudu::consensus::CommitMsg; |
| using kudu::consensus::ConsensusBootstrapInfo; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::tserver::ParticipantRequestPB; |
| using kudu::tserver::ParticipantResponsePB; |
| using kudu::tserver::ParticipantOpPB; |
| using kudu::tserver::TabletServerErrorPB; |
| using kudu::tserver::WriteRequestPB; |
| using std::map; |
| using std::nullopt; |
| using std::optional; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| DECLARE_bool(enable_maintenance_manager); |
| DECLARE_bool(enable_txn_partition_lock); |
| DECLARE_bool(log_preallocate_segments); |
| DECLARE_bool(log_async_preallocate_segments); |
| |
| namespace kudu { |
| namespace tablet { |
| |
| namespace { |
| |
| constexpr const int64_t kTxnId = 1; |
| |
| constexpr const int64_t kTxnOne = 1; |
| constexpr const int64_t kTxnTwo = 2; |
| |
| Schema GetTestSchema() { |
| return Schema({ ColumnSchema("key", INT32), ColumnSchema("val", INT32) }, 1); |
| } |
| |
| // A participant op that waits to start and finish applying based on input |
| // latches. |
| class DelayedParticipantOp : public ParticipantOp { |
| public: |
| DelayedParticipantOp(CountDownLatch* apply_started, |
| CountDownLatch* apply_continue, |
| unique_ptr<ParticipantOpState> state) |
| : ParticipantOp(std::move(state), consensus::LEADER), |
| apply_started_(apply_started), |
| apply_continue_(apply_continue) {} |
| |
| Status Apply(CommitMsg** commit_msg) override { |
| apply_started_->CountDown(); |
| LOG(INFO) << "Delaying apply..."; |
| apply_continue_->Wait(); |
| return ParticipantOp::Apply(commit_msg); |
| } |
| |
| private: |
| CountDownLatch* apply_started_; |
| CountDownLatch* apply_continue_; |
| }; |
| } // anonymous namespace |
| |
| class TxnParticipantTest : public TabletReplicaTestBase { |
| public: |
| TxnParticipantTest() |
| : TabletReplicaTestBase(GetTestSchema()) {} |
| |
| void SetUp() override { |
| // Some of these tests will test the durability semantics of participants. |
| // So we have finer-grained control of on-disk state, disable anything that |
| // might write to disk in the background. |
| FLAGS_enable_maintenance_manager = false; |
| FLAGS_log_preallocate_segments = false; |
| FLAGS_log_async_preallocate_segments = false; |
| |
| NO_FATALS(TabletReplicaTestBase::SetUp()); |
| ConsensusBootstrapInfo info; |
| ASSERT_OK(StartReplicaAndWaitUntilLeader(info)); |
| } |
| |
| Status Write(int key, |
| optional<int64_t> txn_id = nullopt, |
| RowOperationsPB::Type type = RowOperationsPB::INSERT) { |
| WriteRequestPB req; |
| if (txn_id) { |
| req.set_txn_id(*txn_id); |
| } |
| req.set_tablet_id(tablet_replica_->tablet_id()); |
| const auto& schema = GetTestSchema(); |
| RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema())); |
| KuduPartialRow row(&schema); |
| RETURN_NOT_OK(row.SetInt32(0, key)); |
| if (type != RowOperationsPB::DELETE && |
| type != RowOperationsPB::DELETE_IGNORE) { |
| RETURN_NOT_OK(row.SetInt32(1, key)); |
| } |
| RowOperationsPBEncoder enc(req.mutable_row_operations()); |
| enc.Add(type, row); |
| return ExecuteWrite(tablet_replica_.get(), req); |
| } |
| |
| Status Delete(int key) { |
| return Write(key, nullopt, RowOperationsPB::DELETE); |
| } |
| |
| Status CallParticipantOpCheckResp(int64_t txn_id, ParticipantOpPB::ParticipantOpType op_type, |
| int64_t ts_val) { |
| ParticipantResponsePB resp; |
| RETURN_NOT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op_type, ts_val, &resp)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| // Writes an op to the WAL, rolls over onto a new WAL segment, and flushes |
| // the MRS, leaving us with a new WAL segment that should be GC-able unless |
| // previous WAL segments are anchored. |
| Status WriteRolloverAndFlush(int key) { |
| RETURN_NOT_OK(Write(key)); |
| RETURN_NOT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); |
| RETURN_NOT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); |
| return tablet_replica_->tablet()->Flush(); |
| } |
| |
| TxnParticipant* txn_participant() { |
| return tablet_replica_->tablet()->txn_participant(); |
| } |
| |
| Status IterateToStrings(vector<string>* ret) { |
| unique_ptr<RowwiseIterator> iter; |
| RETURN_NOT_OK(tablet_replica_->tablet()->NewRowIterator(GetTestSchema(), &iter)); |
| RETURN_NOT_OK(iter->Init(nullptr)); |
| vector<string> out; |
| RETURN_NOT_OK(IterateToStringList(iter.get(), &out)); |
| *ret = std::move(out); |
| return Status::OK(); |
| } |
| |
| clock::Clock* clock() { |
| return tablet_replica_->tablet()->clock(); |
| } |
| }; |
| |
| TEST_F(TxnParticipantTest, TestSuccessfulSequences) { |
| const auto check_valid_sequence = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops, |
| int64_t txn_id) { |
| for (const auto& type : ops) { |
| ParticipantResponsePB resp; |
| ASSERT_OK(CallParticipantOp( |
| tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp)); |
| SCOPED_TRACE(SecureShortDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_TRUE(resp.has_timestamp()); |
| } |
| }; |
| // Check the happy path where the transaction is committed. |
| NO_FATALS(check_valid_sequence({ |
| ParticipantOpPB::BEGIN_TXN, |
| ParticipantOpPB::BEGIN_COMMIT, |
| ParticipantOpPB::FINALIZE_COMMIT, |
| }, 0)); |
| |
| // Check the case where a transaction is aborted after beginning to commit. |
| NO_FATALS(check_valid_sequence({ |
| ParticipantOpPB::BEGIN_TXN, |
| ParticipantOpPB::BEGIN_COMMIT, |
| ParticipantOpPB::ABORT_TXN, |
| }, 1)); |
| |
| // Check the case where a transaction is aborted after starting but before |
| // committing. |
| NO_FATALS(check_valid_sequence({ |
| ParticipantOpPB::BEGIN_TXN, |
| ParticipantOpPB::ABORT_TXN, |
| }, 2)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { 0, kCommitted, kDummyCommitTimestamp }, |
| { 1, kAborted, -1 }, |
| { 2, kAborted, -1 }, |
| }), txn_participant()->GetTxnsForTests()); |
| } |
| |
| TEST_F(TxnParticipantTest, TestParticipantOpsWhenNotBegun) { |
| auto txn_id = 0; |
| for (auto type : { ParticipantOpPB::BEGIN_COMMIT, |
| ParticipantOpPB::FINALIZE_COMMIT }) { |
| ParticipantResponsePB resp; |
| ASSERT_OK(CallParticipantOp( |
| tablet_replica_.get(), txn_id++, type, kDummyCommitTimestamp, &resp)); |
| SCOPED_TRACE(SecureShortDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_TRUE(resp.error().has_status()); |
| ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, resp.error().code()); |
| ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code()); |
| ASSERT_FALSE(resp.has_timestamp()); |
| ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty()); |
| } |
| ParticipantResponsePB resp; |
| ASSERT_OK(CallParticipantOp( |
| tablet_replica_.get(), txn_id++, ParticipantOpPB::ABORT_TXN, kDummyCommitTimestamp, &resp)); |
| SCOPED_TRACE(SecureShortDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_FALSE(resp.error().has_status()); |
| ASSERT_TRUE(resp.has_timestamp()); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { 2, kAborted, -1 }, |
| }), txn_participant()->GetTxnsForTests()); |
| } |
| |
| TEST_F(TxnParticipantTest, TestIllegalTransitions) { |
| const auto check_valid_op = [&] (const ParticipantOpPB::ParticipantOpType& type, |
| int64_t txn_id) { |
| ParticipantResponsePB resp; |
| ASSERT_OK(CallParticipantOp( |
| tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp)); |
| SCOPED_TRACE(SecureShortDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_TRUE(resp.has_timestamp()); |
| }; |
| const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops, |
| int64_t txn_id) { |
| for (const auto& type : ops) { |
| ParticipantResponsePB resp; |
| ASSERT_OK(CallParticipantOp( |
| tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp)); |
| SCOPED_TRACE(SecureShortDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_TRUE(resp.error().has_status()); |
| ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code()); |
| ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, resp.error().code()); |
| ASSERT_FALSE(resp.has_timestamp()); |
| } |
| }; |
| const auto check_already_applied = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops, |
| int64_t txn_id, bool expect_timestamp) { |
| for (const auto& type : ops) { |
| ParticipantResponsePB resp; |
| ASSERT_OK(CallParticipantOp( |
| tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp)); |
| SCOPED_TRACE(SecureShortDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_TRUE(resp.error().has_status()); |
| ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code()); |
| ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, resp.error().code()); |
| if (expect_timestamp) { |
| ASSERT_TRUE(resp.has_timestamp()); |
| } else { |
| ASSERT_FALSE(resp.has_timestamp()); |
| } |
| } |
| }; |
| // Once we've begun the transaction, we can't finalize without beginning to |
| // commit. |
| NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kTxnId)); |
| NO_FATALS(check_bad_ops({ ParticipantOpPB::FINALIZE_COMMIT }, kTxnId)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kOpen, -1 }, |
| }), txn_participant()->GetTxnsForTests()); |
| |
| // Once we begin committing, we can't start the transaction again. |
| NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_COMMIT, kTxnId)); |
| NO_FATALS(check_already_applied({ ParticipantOpPB::BEGIN_COMMIT }, kTxnId, true)); |
| NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN }, kTxnId)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitInProgress, -1 }, |
| }), txn_participant()->GetTxnsForTests()); |
| |
| // Once we've begun finalizing, we can't do anything. |
| NO_FATALS(check_valid_op(ParticipantOpPB::FINALIZE_COMMIT, kTxnId)); |
| NO_FATALS(check_already_applied({ ParticipantOpPB::BEGIN_COMMIT }, kTxnId, true)); |
| NO_FATALS(check_already_applied({ ParticipantOpPB::FINALIZE_COMMIT }, kTxnId, false)); |
| NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN, |
| ParticipantOpPB::ABORT_TXN }, kTxnId)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp }, |
| }), txn_participant()->GetTxnsForTests()); |
| |
| // Once we've aborted, we can't do anything. |
| const int64_t kAbortedTxnId = 2; |
| NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kAbortedTxnId)); |
| NO_FATALS(check_valid_op(ParticipantOpPB::ABORT_TXN, kAbortedTxnId)); |
| NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN, |
| ParticipantOpPB::BEGIN_COMMIT, |
| ParticipantOpPB::FINALIZE_COMMIT }, kAbortedTxnId)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp }, |
| { kAbortedTxnId, kAborted, -1 }, |
| }), txn_participant()->GetTxnsForTests()); |
| } |
| |
| // Test that we have no trouble operating on separate transactions. |
| TEST_F(TxnParticipantTest, TestConcurrentTransactions) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| const int kNumTxns = 10; |
| vector<thread> threads; |
| Status statuses[kNumTxns]; |
| for (int i = 0; i < kNumTxns; i++) { |
| threads.emplace_back([&, i] { |
| for (const auto& type : kCommitSequence) { |
| ParticipantResponsePB resp; |
| Status s = CallParticipantOp( |
| tablet_replica_.get(), i, type, kDummyCommitTimestamp, &resp); |
| if (s.ok() && resp.has_error()) { |
| s = StatusFromPB(resp.error().status()); |
| } |
| statuses[i] = s; |
| } |
| }); |
| } |
| std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); }); |
| for (const auto& s : statuses) { |
| EXPECT_OK(s); |
| } |
| const auto& txns = txn_participant()->GetTxnsForTests(); |
| for (int i = 0; i < kNumTxns; i++) { |
| ASSERT_EQ(TxnParticipant::TxnEntry({ i, kCommitted, kDummyCommitTimestamp }), txns[i]); |
| } |
| } |
| |
| TEST_F(TxnParticipantTest, TestConcurrentWrites) { |
| constexpr const auto kNumRows = 10; |
| constexpr const auto kTxnId = 0; |
| ParticipantResponsePB resp; |
| ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, |
| ParticipantOpPB::BEGIN_TXN, -1, &resp)); |
| vector<thread> threads; |
| Status statuses[kNumRows]; |
| for (int i = 0; i < kNumRows; i++) { |
| threads.emplace_back([&, i] { |
| statuses[i] = Write(i, kTxnId); |
| }); |
| } |
| std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); }); |
| for (const auto& s : statuses) { |
| EXPECT_OK(s); |
| } |
| } |
| |
| // Concurrently try to apply every op and test, based on the results, that some |
| // invariants are maintained. |
| TEST_F(TxnParticipantTest, TestConcurrentOps) { |
| const map<ParticipantOpPB::ParticipantOpType, int> kIndexByOps = { |
| { ParticipantOpPB::BEGIN_TXN, 0 }, |
| { ParticipantOpPB::BEGIN_COMMIT, 1}, |
| { ParticipantOpPB::FINALIZE_COMMIT, 2}, |
| { ParticipantOpPB::ABORT_TXN, 3}, |
| }; |
| vector<thread> threads; |
| vector<Status> statuses(kIndexByOps.size(), Status::Incomplete("")); |
| for (const auto& op_and_idx : kIndexByOps) { |
| const auto& op_type = op_and_idx.first; |
| const auto& idx = op_and_idx.second; |
| threads.emplace_back([&, op_type, idx] { |
| ParticipantResponsePB resp; |
| Status s = CallParticipantOp( |
| tablet_replica_.get(), kTxnId, op_type, kDummyCommitTimestamp, &resp); |
| if (s.ok() && resp.has_error()) { |
| s = StatusFromPB(resp.error().status()); |
| } |
| statuses[idx] = s; |
| }); |
| } |
| std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); }); |
| const auto status_for_op = [&] (ParticipantOpPB::ParticipantOpType type) { |
| return statuses[FindOrDie(kIndexByOps, type)]; |
| }; |
| // The only way we could have failed to begin a transaction is if we |
| // replicated an ABORT_TXN first. |
| ASSERT_TRUE(status_for_op(ParticipantOpPB::BEGIN_TXN).ok() || |
| status_for_op(ParticipantOpPB::ABORT_TXN).ok()) << |
| Substitute("BEGIN_TXN error: $0, ABORT_TXN error: $1", |
| status_for_op(ParticipantOpPB::BEGIN_TXN).ToString(), |
| status_for_op(ParticipantOpPB::ABORT_TXN).ToString()); |
| |
| // If we finalized the commit, we must not have been able to abort. |
| if (status_for_op(ParticipantOpPB::FINALIZE_COMMIT).ok()) { |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp }, |
| }), txn_participant()->GetTxnsForTests()); |
| ASSERT_FALSE(status_for_op(ParticipantOpPB::ABORT_TXN).ok()); |
| |
| // If we aborted the commit, we could not have finalized the commit. |
| } else if (status_for_op(ParticipantOpPB::ABORT_TXN).ok()) { |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kAborted, -1 }, |
| }), txn_participant()->GetTxnsForTests()); |
| ASSERT_FALSE(status_for_op(ParticipantOpPB::FINALIZE_COMMIT).ok()); |
| |
| // If we neither aborted nor finalized, but we began to commit, we should be |
| // left with the commit in progress. |
| } else if (status_for_op(ParticipantOpPB::BEGIN_COMMIT).ok()) { |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitInProgress, -1 }, |
| }), txn_participant()->GetTxnsForTests()); |
| |
| // Finally, if nothing else succeeded, at least we should have been able to |
| // start the transaction. |
| } else { |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kOpen, -1 }, |
| }), txn_participant()->GetTxnsForTests()); |
| } |
| } |
| |
| TEST_F(TxnParticipantTest, TestReplayParticipantOps) { |
| for (const auto& type : kCommitSequence) { |
| ParticipantResponsePB resp; |
| ASSERT_OK(CallParticipantOp( |
| tablet_replica_.get(), kTxnId, type, kDummyCommitTimestamp, &resp)); |
| SCOPED_TRACE(SecureShortDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_TRUE(resp.has_timestamp()); |
| } |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp } |
| }), txn_participant()->GetTxnsForTests()); |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp } |
| }), txn_participant()->GetTxnsForTests()); |
| } |
| |
| // Test that each transaction has a single anchor that gets updated as |
| // participant ops land. |
| TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) { |
| int64_t expected_index = 1; |
| // Validates that each op in the given sequence updates the single anchor |
| // maintained for the transaction. |
| const auto check_participant_ops_are_anchored = |
| [&] (int64_t txn_id, const vector<ParticipantOpPB::ParticipantOpType>& ops) { |
| for (const auto& op : ops) { |
| ASSERT_OK(CallParticipantOpCheckResp(txn_id, op, kDummyCommitTimestamp)); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| expected_index++; |
| if (op == ParticipantOpPB::BEGIN_COMMIT) { |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| int64_t log_index = -1; |
| tablet_replica_->log_anchor_registry()->GetEarliestRegisteredLogIndex(&log_index); |
| ASSERT_EQ(expected_index, log_index); |
| } else { |
| ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| } |
| } |
| ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| }; |
| NO_FATALS(check_participant_ops_are_anchored(1, { |
| ParticipantOpPB::BEGIN_TXN, |
| ParticipantOpPB::BEGIN_COMMIT, |
| ParticipantOpPB::FINALIZE_COMMIT |
| })); |
| NO_FATALS(check_participant_ops_are_anchored(2, { |
| ParticipantOpPB::BEGIN_TXN, |
| ParticipantOpPB::BEGIN_COMMIT, |
| ParticipantOpPB::ABORT_TXN |
| })); |
| } |
| |
| TEST_F(TxnParticipantTest, TestTakePartitionLockOnRestart) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| // Get to write some rows. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, |
| kDummyCommitTimestamp)); |
| |
| // We should be able to at least start another transaction. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, |
| kDummyCommitTimestamp)); |
| ASSERT_OK(Write(0, kTxnOne)); |
| const auto check_other_txns_cant_write = [&] { |
| // We'll try writing a couple times to make sure the act of writing doesn't |
| // somehow permit further writes to the transaction. |
| for (int i = 0; i < 2; i++) { |
| Status s = Write(0); |
| ASSERT_TRUE(s.IsAborted()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another"); |
| |
| s = Write(0, kTxnTwo); |
| ASSERT_TRUE(s.IsAborted()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another"); |
| } |
| }; |
| NO_FATALS(check_other_txns_cant_write()); |
| |
| // We shouldn't be able to write even after restarting. |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| NO_FATALS(check_other_txns_cant_write()); |
| |
| // We should be able to write as a part of the transaction though. |
| ASSERT_OK(Write(1, kTxnOne)); |
| |
| // Once we begin committing, we still shouldn't be able to write, even after |
| // restarting. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT, |
| kDummyCommitTimestamp)); |
| NO_FATALS(check_other_txns_cant_write()); |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| NO_FATALS(check_other_txns_cant_write()); |
| // We also shouldn't be able to write as a part of the transaction, since |
| // it's not open for further writes. |
| Status s = Write(2, kTxnOne); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "not open: COMMIT_IN_PROGRESS"); |
| |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| |
| s = Write(2, kTxnOne); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "not open: COMMIT_IN_PROGRESS"); |
| |
| s = Write(0, kTxnTwo); |
| ASSERT_TRUE(s.IsAborted()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another"); |
| |
| // Once we finalize the commit, we should be able to write again. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::FINALIZE_COMMIT, |
| kDummyCommitTimestamp)); |
| |
| // And we shouldn't be able to write to the same transaction once committed. |
| s = Write(2, kTxnOne); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "not open"); |
| |
| // We should be able to write to the other transaction now that the partition |
| // lock isn't held. |
| ASSERT_OK(Write(2, kTxnTwo)); |
| |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| |
| s = Write(2, kTxnOne); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "not open"); |
| |
| // We should be able to write to the other transaction now that the partition |
| // lock isn't held. |
| ASSERT_OK(Write(3, kTxnTwo)); |
| } |
| |
| TEST_F(TxnParticipantTest, TestGetAbortedTxnMetadata) { |
| TxnMetadataPB pb; |
| const auto& meta = tablet_replica_->tablet_metadata(); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::ABORT_TXN, |
| kDummyCommitTimestamp)); |
| ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb)); |
| ASSERT_TRUE(pb.aborted()); |
| ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp()); |
| ASSERT_FALSE(pb.has_commit_timestamp()); |
| ASSERT_FALSE(pb.has_flushed_committed_mrs()); |
| } |
| |
| TEST_F(TxnParticipantTest, TestGetCommittedTxnMetadata) { |
| TxnMetadataPB pb; |
| const auto& meta = tablet_replica_->tablet_metadata(); |
| ASSERT_FALSE(meta->GetTxnMetadataPB(kTxnId, &pb)); |
| ASSERT_FALSE(pb.has_aborted()); |
| ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp()); |
| ASSERT_FALSE(pb.has_commit_timestamp()); |
| ASSERT_FALSE(pb.has_flushed_committed_mrs()); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, |
| kDummyCommitTimestamp)); |
| // There will be metadata, but it will be empty since there is not |
| // commit/abort information yet. |
| ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb)); |
| ASSERT_FALSE(pb.has_aborted()); |
| ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp()); |
| ASSERT_FALSE(pb.has_commit_timestamp()); |
| ASSERT_FALSE(pb.has_flushed_committed_mrs()); |
| |
| // Once we begin committing, we should see the BEGIN_COMMIT op timestamp. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, |
| kDummyCommitTimestamp)); |
| ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb)); |
| ASSERT_FALSE(pb.has_aborted()); |
| ASSERT_TRUE(pb.has_commit_mvcc_op_timestamp()); |
| ASSERT_LT(0, pb.commit_mvcc_op_timestamp()); |
| ASSERT_FALSE(pb.has_commit_timestamp()); |
| ASSERT_FALSE(pb.has_flushed_committed_mrs()); |
| |
| // Once we finalize the commit, we should see a commit timestamp. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| kDummyCommitTimestamp)); |
| ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb)); |
| ASSERT_FALSE(pb.has_aborted()); |
| ASSERT_TRUE(pb.has_commit_mvcc_op_timestamp()); |
| ASSERT_LT(0, pb.commit_mvcc_op_timestamp()); |
| ASSERT_TRUE(pb.has_commit_timestamp()); |
| ASSERT_EQ(kDummyCommitTimestamp, pb.commit_timestamp()); |
| ASSERT_FALSE(pb.has_flushed_committed_mrs()); |
| } |
| |
| // Test that participant ops result in tablet metadata updates that can survive |
| // restarts, and that the appropriate anchors are in place as we progress |
| // through a transaction's life cycle. |
| TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| // First, do a sanity check that there's nothing GCable. |
| int64_t gcable_size; |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_EQ(0, gcable_size); |
| |
| // Perform some initial participant ops and roll the WAL segments so there |
| // are some candidates for WAL GC. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, |
| kDummyCommitTimestamp)); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); |
| ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); |
| ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| |
| // We can't GC down to 0 segments, so write some rows and roll onto new |
| // segments so we have a segment to GC. |
| int current_key = 0; |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| |
| // We should be able to GC the WALs that have the BEGIN_TXN op. |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_GT(gcable_size, 0); |
| ASSERT_OK(tablet_replica_->RunLogGC()); |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_EQ(0, gcable_size); |
| |
| // At this point, we have a single segment with some writes in it. |
| // Write and flush a BEGIN_COMMIT op. Once we GC, our WAL will start on this |
| // op, and WALs should be anchored until the commit is finalized, regardless |
| // of whether there are more segments. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, |
| kDummyCommitTimestamp)); |
| ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); |
| ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); |
| // There should be two anchors for this op: one that is in place until the |
| // FINALIZE_COMMIT op, another until we flush. |
| ASSERT_EQ(2, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_GT(gcable_size, 0); |
| ASSERT_OK(tablet_replica_->RunLogGC()); |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_EQ(0, gcable_size); |
| |
| // Even if we write and roll over, we shouldn't be able to GC because of the |
| // anchor. |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_EQ(0, gcable_size); |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitInProgress, -1 } |
| }), txn_participant()->GetTxnsForTests()); |
| |
| // Once we finalize the commit, the BEGIN_COMMIT anchor should be released. |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| kDummyCommitTimestamp)); |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| |
| // Because we rebuilt the WAL, we only have one segment and thus shouldn't be |
| // able to GC anything until we add more segments. |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_EQ(0, gcable_size); |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_GT(gcable_size, 0); |
| ASSERT_OK(tablet_replica_->RunLogGC()); |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_EQ(0, gcable_size); |
| |
| // Ensure the transaction bootstraps to the expected state. |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp } |
| }), txn_participant()->GetTxnsForTests()); |
| } |
| |
| // Test that we can replay BEGIN_COMMIT ops, given it anchors WALs until |
| // metadata flush _and_ until the transaction is finalized or aborted. |
| TEST_F(TxnParticipantTest, TestBeginCommitAnchorsOnFlush) { |
| // Start a transaction and begin committing. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, kDummyCommitTimestamp)); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| auto txn_meta = FindOrDie(tablet_replica_->tablet_metadata()->GetTxnMetadata(), kTxnId); |
| ASSERT_EQ(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, |
| kDummyCommitTimestamp)); |
| // We should have two anchors: one that lasts until we flush, another that |
| // lasts until we finalize the commit. |
| ASSERT_EQ(2, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| |
| // We should have an MVCC op timestamp in the metadata, even after |
| // restarting. |
| ASSERT_NE(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| const auto orig_mvcc_op_timestamp = *txn_meta->commit_mvcc_op_timestamp(); |
| txn_meta.reset(); |
| RestartReplica(/*reset_tablet*/true); |
| txn_meta = FindOrDie(tablet_replica_->tablet_metadata()->GetTxnMetadata(), kTxnId); |
| ASSERT_NE(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| ASSERT_EQ(orig_mvcc_op_timestamp, *txn_meta->commit_mvcc_op_timestamp()); |
| |
| // Once we flush, we should drop down to one anchor. |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| kDummyCommitTimestamp)); |
| |
| // The anchor from the BEGIN_COMMIT op should be gone, but we should have |
| // another anchor for the FINALIZE_COMMIT op until we flush the metadata. |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| |
| // As another sanity check, we should still have metadata for the MVCC op |
| // after restarting. |
| ASSERT_NE(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| txn_meta.reset(); |
| RestartReplica(/*reset_tablet*/true); |
| txn_meta = FindOrDie(tablet_replica_->tablet_metadata()->GetTxnMetadata(), kTxnId); |
| ASSERT_NE(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| ASSERT_EQ(orig_mvcc_op_timestamp, *txn_meta->commit_mvcc_op_timestamp()); |
| } |
| |
| // Like the above test but finalizing the commit before flushing the metadata. |
| TEST_F(TxnParticipantTest, TestBeginCommitAnchorsOnFinalize) { |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, kDummyCommitTimestamp)); |
| auto txn_meta = FindOrDie(tablet_replica_->tablet_metadata()->GetTxnMetadata(), kTxnId); |
| ASSERT_EQ(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, |
| kDummyCommitTimestamp)); |
| ASSERT_NE(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| const auto orig_mvcc_op_timestamp = *txn_meta->commit_mvcc_op_timestamp(); |
| |
| // Restarting shouldn't affect our metadata. |
| txn_meta.reset(); |
| RestartReplica(/*reset_tablet*/true); |
| txn_meta = FindOrDie(tablet_replica_->tablet_metadata()->GetTxnMetadata(), kTxnId); |
| ASSERT_NE(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| ASSERT_EQ(orig_mvcc_op_timestamp, *txn_meta->commit_mvcc_op_timestamp()); |
| |
| // We should have two anchors, one that lasts until we flush, another that |
| // lasts until we finalize. |
| ASSERT_EQ(2, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| kDummyCommitTimestamp)); |
| |
| // Finalizing the commit shouldn't affect our metadata. |
| txn_meta.reset(); |
| RestartReplica(/*reset_tablet*/true); |
| txn_meta = FindOrDie(tablet_replica_->tablet_metadata()->GetTxnMetadata(), kTxnId); |
| ASSERT_NE(nullopt, txn_meta->commit_mvcc_op_timestamp()); |
| ASSERT_EQ(orig_mvcc_op_timestamp, *txn_meta->commit_mvcc_op_timestamp()); |
| |
| // One anchor should be gone and another should be registered in its place |
| // that lasts until we flush the finalized metadata. |
| ASSERT_EQ(2, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| } |
| |
| class MetadataFlushTxnParticipantTest : public TxnParticipantTest, |
| public ::testing::WithParamInterface<bool> {}; |
| |
| // Test rebuilding transaction state from the WALs and metadata. |
| TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) { |
| const bool should_flush = GetParam(); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, kDummyCommitTimestamp)); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kOpen, -1 } |
| }), txn_participant()->GetTxnsForTests()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, |
| kDummyCommitTimestamp)); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitInProgress, -1 } |
| }), txn_participant()->GetTxnsForTests()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| kDummyCommitTimestamp)); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp } |
| }), txn_participant()->GetTxnsForTests()); |
| |
| // Now perform the same validation but for a transaction that gets aborted. |
| const int64_t kAbortedTxnId = 2; |
| ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN, |
| kDummyCommitTimestamp)); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp }, |
| { kAbortedTxnId, kOpen, -1 } |
| }), txn_participant()->GetTxnsForTests()); |
| ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN, |
| kDummyCommitTimestamp)); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kCommitted, kDummyCommitTimestamp }, |
| { kAbortedTxnId, kAborted, -1 } |
| }), txn_participant()->GetTxnsForTests()); |
| } |
| |
| // Test rebuilding transaction state, including writes, from WALs and metadata. |
| TEST_P(MetadataFlushTxnParticipantTest, TestReplayTransactionalInserts) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| const bool should_flush = GetParam(); |
| constexpr const int64_t kAbortedTxnId = 2; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(0, kTxnId)); |
| ASSERT_OK(Write(0, kAbortedTxnId)); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| |
| // As long as we haven't finalized the transaction, we shouldn't be able to |
| // iterate through its mutations, even across restarts. |
| vector<string> rows; |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN, |
| clock()->Now().value())); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| |
| // Once we committed the transaction, we should see the rows. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| } |
| |
| // Test replaying mutations to transactional MRSs. |
| TEST_P(MetadataFlushTxnParticipantTest, TestReplayUpdatesToTransactionalMRS) { |
| const bool should_flush = GetParam(); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(0, kTxnId)); |
| ASSERT_OK(Write(1, kTxnId)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| if (should_flush) { |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| } |
| ASSERT_OK(Delete(0)); |
| vector<string> rows; |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(ShouldFlushMetadata, MetadataFlushTxnParticipantTest, |
| ::testing::Values(true, false)); |
| |
| // Similar to the above test, but checking that in-flight ops anchor the WALs. |
| TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| ParticipantRequestPB req; |
| ParticipantResponsePB resp; |
| auto op_state = NewParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN, |
| kDummyCommitTimestamp, &req, &resp); |
| CountDownLatch latch(1); |
| CountDownLatch apply_start(1); |
| CountDownLatch apply_continue(1); |
| op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>( |
| new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, &resp))); |
| |
| scoped_refptr<OpDriver> driver; |
| unique_ptr<DelayedParticipantOp> op( |
| new DelayedParticipantOp(&apply_start, &apply_continue, std::move(op_state))); |
| ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver)); |
| driver->ExecuteAsync(); |
| // Wait for the apply to start, indicating that we have persisted and |
| // replicated but not yet Raft committed the participant op. |
| apply_start.Wait(); |
| ASSERT_TRUE(driver->GetOpId().IsInitialized()); |
| ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests()); |
| |
| // Create some WAL segments to ensure some would-be-GC-able segments. |
| int current_key = 0; |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| |
| // Our participant op is still pending, and nothing should be GC-able. |
| ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests()); |
| int64_t gcable_size; |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_EQ(0, gcable_size); |
| |
| // Finish applying the participant op and proceed to completion. |
| // Even with more segments added, the WAL should be anchored until we flush |
| // the tablet metadata to include the transaction. |
| apply_continue.CountDown(); |
| latch.Wait(); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); |
| ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); |
| |
| // Add some segments to ensure there are enough segments to GC. |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_OK(WriteRolloverAndFlush(current_key++)); |
| ASSERT_EQ(0, gcable_size); |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_GT(gcable_size, 0); |
| |
| // Now that we've completed the op, we can get rid of the WAL segments that |
| // had the participant op. |
| ASSERT_OK(tablet_replica_->RunLogGC()); |
| ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); |
| ASSERT_EQ(0, gcable_size); |
| |
| // As a sanity check, ensure we get to the expected state if we reboot. |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ |
| { kTxnId, kOpen, -1 } |
| }), txn_participant()->GetTxnsForTests()); |
| } |
| |
| // Test that we can only write to transactions if they are open. |
| TEST_F(TxnParticipantTest, TestWriteToOpenTransactionsOnly) { |
| constexpr const int64_t kAbortedTxnId = 2; |
| Status s = Write(0, kTxnId); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(0, kTxnId)); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| // Even if the row already exists, we shouldn't get an AlreadyPresent error; |
| // the transaction's state is checked much earlier than the presence check. |
| s = Write(0, kTxnId); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| s = Write(1, kTxnId); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| kDummyCommitTimestamp)); |
| s = Write(0, kTxnId); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| s = Write(1, kTxnId); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(2, kAbortedTxnId)); |
| ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN, -1)); |
| s = Write(2, kAbortedTxnId); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| s = Write(3, kAbortedTxnId); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| } |
| |
| // Test that we get an appropriate error when attempting transactional ops that |
| // are not supported. |
| TEST_F(TxnParticipantTest, TestUnsupportedOps) { |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| Status s = Write(0, kTxnId, RowOperationsPB::UPSERT); |
| ASSERT_TRUE(s.IsNotSupported()) << s.ToString(); |
| s = Write(0, kTxnId, RowOperationsPB::UPSERT_IGNORE); |
| ASSERT_TRUE(s.IsNotSupported()) << s.ToString(); |
| s = Write(0, kTxnId, RowOperationsPB::UPDATE); |
| ASSERT_TRUE(s.IsNotSupported()) << s.ToString(); |
| s = Write(0, kTxnId, RowOperationsPB::UPDATE_IGNORE); |
| ASSERT_TRUE(s.IsNotSupported()) << s.ToString(); |
| |
| // None of the ops should have done anything. |
| ASSERT_EQ(0, tablet_replica_->CountLiveRowsNoFail()); |
| |
| s = Write(0, kTxnId, RowOperationsPB::DELETE); |
| ASSERT_TRUE(s.IsNotSupported()) << s.ToString(); |
| s = Write(0, kTxnId, RowOperationsPB::DELETE_IGNORE); |
| ASSERT_TRUE(s.IsNotSupported()) << s.ToString(); |
| } |
| |
| // Test that rows inserted to transactional stores only show up when the |
| // transactions complete. |
| TEST_F(TxnParticipantTest, TestInsertToTransactionMRS) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(0, kTxnOne)); |
| ASSERT_OK(Write(1, kTxnTwo)); |
| ASSERT_OK(Write(2, kTxnTwo)); |
| |
| vector<string> rows; |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| // Only after we finalize a transaction's commit should we see its rows. |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(3, rows.size()); |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(3, rows.size()); |
| } |
| |
| // Test that rows inserted to transactional stores don't show up if the |
| // transaction is aborted. |
| TEST_F(TxnParticipantTest, TestDontReadAbortedInserts) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(0, kTxnOne)); |
| ASSERT_OK(Write(1, kTxnTwo)); |
| |
| vector<string> rows; |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| |
| // Even if we begin committing, if the transaction is ultimately aborted, we |
| // should see nothing. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::ABORT_TXN, -1)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::ABORT_TXN, -1)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| } |
| |
| // Test that rows inserted as a part of a transaction cannot be updated if the |
| // transaction is aborted. |
| TEST_F(TxnParticipantTest, TestUpdateAfterAborting) { |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(0, kTxnId)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::ABORT_TXN, -1)); |
| Status s = Write(0, nullopt, RowOperationsPB::UPDATE); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_OK(Write(0, nullopt, RowOperationsPB::UPDATE_IGNORE)); |
| ASSERT_EQ(1, tablet_replica_->tablet()->metrics()->update_ignore_errors->value()); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_EQ(0, tablet_replica_->CountLiveRowsNoFail()); |
| |
| s = Write(0, nullopt, RowOperationsPB::DELETE); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_OK(Write(0, nullopt, RowOperationsPB::DELETE_IGNORE)); |
| ASSERT_EQ(1, tablet_replica_->tablet()->metrics()->delete_ignore_errors->value()); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_EQ(0, tablet_replica_->CountLiveRowsNoFail()); |
| |
| ASSERT_OK(Write(0, nullopt, RowOperationsPB::UPSERT)); |
| ASSERT_EQ(1, tablet_replica_->CountLiveRowsNoFail()); |
| } |
| |
| // Test that we can update rows that were inserted and committed as a part of a |
| // transaction. |
| TEST_F(TxnParticipantTest, TestUpdateCommittedTransactionMRS) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(0, kTxnId)); |
| |
| // Since we haven't committed yet, we should see no rows. |
| vector<string> rows; |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| Status s = Delete(0); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| |
| // We still haven't finished committing, so we should see no rows. |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| s = Delete(0); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| |
| // We should be able to update committed, transactional stores. |
| ASSERT_OK(Delete(0)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| |
| // We should be able to re-insert the deleted row, even if to a row written |
| // during a transaction. |
| ASSERT_OK(Write(0)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| } |
| |
| // Test that we can flush multiple MRSs, and that when restarting, ops are |
| // replayed (or not) as appropriate. |
| TEST_F(TxnParticipantTest, TestFlushMultipleMRSs) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| const int kNumTxns = 3; |
| const int kNumRowsPerTxn = 100; |
| vector<string> rows; |
| Tablet* tablet = tablet_replica_->tablet(); |
| scoped_refptr<TabletComponents> comps; |
| for (int t = 0; t < kNumTxns; t++) { |
| ASSERT_OK(CallParticipantOpCheckResp(t, ParticipantOpPB::BEGIN_TXN, kDummyCommitTimestamp)); |
| |
| // Since we haven't committed anything, the tablet components shouldn't |
| // have any transactional MRSs. |
| tablet->GetComponents(&comps); |
| ASSERT_TRUE(comps->txn_memrowsets.empty()); |
| } |
| for (int t = 0; t < kNumTxns; t++) { |
| for (int r = 0; r < kNumRowsPerTxn; r++) { |
| ASSERT_OK(Write(t * kNumRowsPerTxn + r, t)); |
| } |
| ASSERT_OK(CallParticipantOpCheckResp(t, ParticipantOpPB::BEGIN_COMMIT, kDummyCommitTimestamp)); |
| ASSERT_OK(CallParticipantOpCheckResp(t, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ((t + 1) * kNumRowsPerTxn, rows.size()); |
| tablet->GetComponents(&comps); |
| ASSERT_EQ(t + 1, comps->txn_memrowsets.size()); |
| } |
| // After restarting, we should have the same number of rows and MRSs. |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| tablet = tablet_replica_->tablet(); |
| |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(kNumTxns * kNumRowsPerTxn, rows.size()); |
| tablet->GetComponents(&comps); |
| ASSERT_EQ(kNumTxns, comps->txn_memrowsets.size()); |
| |
| // Once flushed, we should have the same number of rows, but no txn MRSs. |
| ASSERT_OK(tablet_replica_->tablet()->Flush()); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(kNumTxns * kNumRowsPerTxn, rows.size()); |
| tablet->GetComponents(&comps); |
| ASSERT_TRUE(comps->txn_memrowsets.empty()); |
| |
| // The verifications should hold after restarting the replica after flushing. |
| ASSERT_OK(RestartReplica(/*reset_tablet*/true)); |
| tablet = tablet_replica_->tablet(); |
| |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(kNumTxns * kNumRowsPerTxn, rows.size()); |
| tablet->GetComponents(&comps); |
| ASSERT_TRUE(comps->txn_memrowsets.empty()); |
| } |
| |
| // Test that INSERT_IGNORE ops work when the row exists in the transactional |
| // MRS. |
| TEST_F(TxnParticipantTest, TestInsertIgnoreInTransactionMRS) { |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| |
| // Insert into a new transactional MRS, and then INSERT_IGNORE as a part of a |
| // transaction. |
| vector<string> rows; |
| ASSERT_OK(Write(0, kTxnId)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_TRUE(rows.empty()); |
| |
| Status s = Write(0, kTxnId); |
| ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString(); |
| ASSERT_EQ(0, tablet_replica_->tablet()->metrics()->insert_ignore_errors->value()); |
| |
| ASSERT_OK(Write(0, kTxnId, RowOperationsPB::INSERT_IGNORE)); |
| ASSERT_EQ(1, tablet_replica_->tablet()->metrics()->insert_ignore_errors->value()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| } |
| |
| // Test that INSERT_IGNORE ops work when the row exists in the main MRS. |
| TEST_F(TxnParticipantTest, TestInsertIgnoreInMainMRS) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| // Insert into the main MRS, and then INSERT_IGNORE as a part of a |
| // transaction. |
| vector<string> rows; |
| ASSERT_OK(Write(0)); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| |
| Status s = Write(0, kTxnId); |
| ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString(); |
| ASSERT_EQ(0, tablet_replica_->tablet()->metrics()->insert_ignore_errors->value()); |
| |
| ASSERT_OK(Write(0, kTxnId, RowOperationsPB::INSERT_IGNORE)); |
| ASSERT_EQ(1, tablet_replica_->tablet()->metrics()->insert_ignore_errors->value()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(1, rows.size()); |
| } |
| |
| // Test that the live row count accounts for transactional MRSs. |
| TEST_F(TxnParticipantTest, TestLiveRowCountAccountsForTransactionalMRSs) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(Write(0)); |
| ASSERT_OK(Write(1, kTxnOne)); |
| ASSERT_OK(Write(2, kTxnTwo)); |
| ASSERT_EQ(1, tablet_replica_->CountLiveRowsNoFail()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_EQ(1, tablet_replica_->CountLiveRowsNoFail()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_EQ(2, tablet_replica_->CountLiveRowsNoFail()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_EQ(2, tablet_replica_->CountLiveRowsNoFail()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_EQ(3, tablet_replica_->CountLiveRowsNoFail()); |
| ASSERT_OK(Delete(1)); |
| ASSERT_OK(Delete(2)); |
| ASSERT_EQ(1, tablet_replica_->CountLiveRowsNoFail()); |
| } |
| |
| // Test that the MRS size metrics account for transactional MRSs. |
| TEST_F(TxnParticipantTest, TestSizeAccountsForTransactionalMRS) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_TRUE(tablet_replica_->tablet()->MemRowSetEmpty()); |
| |
| auto* tablet = tablet_replica_->tablet(); |
| auto mrs_size_with_empty = tablet->MemRowSetSize(); |
| |
| ASSERT_OK(Write(1, kTxnOne)); |
| ASSERT_OK(Write(2, kTxnTwo)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_TRUE(tablet->MemRowSetEmpty()); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| auto mrs_size_with_one = tablet->MemRowSetSize(); |
| ASSERT_GT(mrs_size_with_one, mrs_size_with_empty); |
| ASSERT_FALSE(tablet->MemRowSetEmpty()); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| auto mrs_size_with_two = tablet->MemRowSetSize(); |
| ASSERT_GT(mrs_size_with_two, mrs_size_with_one); |
| |
| // The MRSs shouldn't be considered empty even if their rows are deleted, |
| // since they still contain mutations. |
| ASSERT_OK(Delete(1)); |
| ASSERT_OK(Delete(2)); |
| ASSERT_FALSE(tablet_replica_->tablet()->MemRowSetEmpty()); |
| } |
| |
| // Test that the MRS anchored WALs metric accounts for transactional MRSs. |
| TEST_F(TxnParticipantTest, TestWALsAnchoredAccountsForTransactionalMRS) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| const auto mrs_wal_size = [&] { |
| map<int64_t, int64_t> replay_size_map; |
| CHECK_OK(tablet_replica_->GetReplaySizeMap(&replay_size_map)); |
| return tablet_replica_->tablet()->MemRowSetLogReplaySize(replay_size_map); |
| }; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1)); |
| |
| // Write a row and roll over onto a new WAL segment so there are bytes to GC. |
| ASSERT_OK(Write(0, kTxnOne)); |
| ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); |
| ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); |
| |
| // Nothing should be considered anchored, as the transaction hasn't been |
| // committed -- it thus wouldn't make sense to perform maintenance ops based |
| // on WAL segments for the uncommitted write. |
| ASSERT_EQ(0, mrs_wal_size()); |
| |
| // Once we commit, we should see some GCable bytes. |
| ASSERT_OK(Write(1, kTxnOne)); |
| ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); |
| ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| auto mrs_wal_size_with_first_committed = mrs_wal_size(); |
| ASSERT_GT(mrs_wal_size_with_first_committed, 0); |
| |
| ASSERT_OK(Write(2, kTxnTwo)); |
| ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); |
| ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); |
| auto mrs_wal_size_with_both_written = mrs_wal_size(); |
| |
| // Despite not having committed the second transaction, we still wrote new |
| // WAL segments, and that's enough to bump the MRS WALs anchored value. |
| ASSERT_GT(mrs_wal_size_with_both_written, mrs_wal_size_with_first_committed); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| |
| auto mrs_wal_size_with_both_committed = mrs_wal_size(); |
| ASSERT_EQ(mrs_wal_size_with_both_committed, mrs_wal_size_with_both_written); |
| } |
| |
| // Test racing writes with commits, ensuring that we cease writing once |
| // beginning to commit. |
| TEST_F(TxnParticipantTest, TestRacingCommitAndWrite) { |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1)); |
| int first_error_row = -1; |
| CountDownLatch first_write(1); |
| thread t([&] { |
| for (int row = 0 ;; row++) { |
| Status s = Write(row, kTxnId); |
| if (!s.ok()) { |
| first_error_row = row; |
| break; |
| } |
| first_write.CountDown(); |
| } |
| }); |
| first_write.Wait(); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| t.join(); |
| ASSERT_GT(first_error_row, 0); |
| ASSERT_EQ(first_error_row, tablet_replica_->CountLiveRowsNoFail()); |
| } |
| |
| // Test that the write metrics account for transactional rowsets. |
| TEST_F(TxnParticipantTest, TestMRSLookupsMetric) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1)); |
| |
| // A non-transactional write should not check any MRSs -- it should just be |
| // inserted into the main MRS. |
| ASSERT_OK(Write(0)); |
| ASSERT_EQ(0, tablet_replica_->tablet()->metrics()->mrs_lookups->value()); |
| |
| // A transactional write will check the main MRS before trying to insert to |
| // the transactional MRS. |
| ASSERT_OK(Write(1, kTxnOne)); |
| ASSERT_EQ(1, tablet_replica_->tablet()->metrics()->mrs_lookups->value()); |
| ASSERT_OK(Write(2, kTxnTwo)); |
| ASSERT_EQ(2, tablet_replica_->tablet()->metrics()->mrs_lookups->value()); |
| |
| // Once a transaction is committed, its MRS and the main MRS will be checked |
| // for new transactional writes. |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_OK(Write(3, kTxnTwo)); |
| ASSERT_EQ(4, tablet_replica_->tablet()->metrics()->mrs_lookups->value()); |
| |
| // Non-transactional writes will only check the committed transactional MRS, |
| // before attempting to insert to the main MRS. |
| ASSERT_OK(Write(4)); |
| ASSERT_EQ(5, tablet_replica_->tablet()->metrics()->mrs_lookups->value()); |
| |
| // Trying to delete a row that doesn't exist will consult just the committed |
| // transactional MRS before attempting to delete from the main MRS. |
| Status s = Delete(10); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_EQ(6, tablet_replica_->tablet()->metrics()->mrs_lookups->value()); |
| |
| // Deleting a row that exists in a MRS, the committed transactional MRS is |
| // checked, and the successful deletion from the MRS increments the lookup |
| // value, regardless of which MRS the row is in. |
| ASSERT_OK(Delete(0)); |
| ASSERT_EQ(8, tablet_replica_->tablet()->metrics()->mrs_lookups->value()); |
| ASSERT_OK(Delete(1)); |
| ASSERT_EQ(10, tablet_replica_->tablet()->metrics()->mrs_lookups->value()); |
| } |
| |
| struct ConcurrencyParams { |
| int num_txns; |
| int num_rows_per_thread; |
| }; |
| class TxnParticipantConcurrencyTest : public TxnParticipantTest, |
| public ::testing::WithParamInterface<ConcurrencyParams> {}; |
| |
| // Test inserting into multiple transactions from multiple threads. |
| TEST_P(TxnParticipantConcurrencyTest, TestConcurrentDisjointInsertsTxn) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| const auto& params = GetParam(); |
| const auto& num_txns = params.num_txns; |
| const int kNumThreads = 10; |
| const auto& rows_per_thread = params.num_rows_per_thread; |
| for (int txn_id = 0; txn_id < num_txns; txn_id++) { |
| ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_TXN, -1)); |
| } |
| // Insert to multiple transactions concurrently. |
| vector<thread> threads; |
| for (int i = 0; i < kNumThreads; i++) { |
| threads.emplace_back([&, i] { |
| for (int r = 0; r < rows_per_thread; r++) { |
| int row = i * rows_per_thread + r; |
| ASSERT_OK(Write(row, row % num_txns)); |
| } |
| }); |
| } |
| for (auto& t : threads) { |
| t.join(); |
| } |
| vector<string> rows; |
| for (int txn_id = 0; txn_id < num_txns; txn_id++) { |
| ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| } |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(0, rows.size()); |
| |
| // As we commit our transactions, we should see more and more rows show up. |
| for (int txn_id = 0; txn_id < num_txns; txn_id++) { |
| ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::FINALIZE_COMMIT, |
| clock()->Now().value())); |
| ASSERT_OK(IterateToStrings(&rows)); |
| ASSERT_EQ(kNumThreads * rows_per_thread * (txn_id + 1) / num_txns, rows.size()); |
| } |
| } |
| INSTANTIATE_TEST_SUITE_P(ConcurrencyParams, TxnParticipantConcurrencyTest, |
| ::testing::Values( |
| ConcurrencyParams{ /*num_txns*/1, /*num_rows_per_thread*/1 }, |
| ConcurrencyParams{ /*num_txns*/10, /*num_rows_per_thread*/1 }, |
| ConcurrencyParams{ /*num_txns*/1, /*num_rows_per_thread*/10 } |
| )); |
| |
| } // namespace tablet |
| } // namespace kudu |