| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/transactions/txn_status_tablet.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <iostream> |
| #include <memory> |
| #include <string> |
| #include <thread> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include <gtest/gtest.h> |
| |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tablet/tablet-test-util.h" |
| #include "kudu/tablet/tablet_replica-test-base.h" |
| #include "kudu/transactions/transactions.pb.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| |
| using kudu::consensus::ConsensusBootstrapInfo; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::tablet::TabletReplicaTestBase; |
| using kudu::tserver::TabletServerErrorPB; |
| using std::ostream; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace transactions { |
| |
| namespace { |
| |
| const char kOwner[] = "bojack"; |
| const char kParticipant[] = "peanutbutter"; |
| string ParticipantId(int i) { |
| return Substitute("$0$1", kParticipant, i); |
| } |
| constexpr const int64_t kFakeTime = 43110; |
| |
| // Simple representation of an entry in the transaction status tablet. |
| struct SimpleEntry { |
| int64_t txn_id; |
| TxnStatusEntryPB txn_pb; |
| vector<ParticipantIdAndPB> prt_pbs; |
| |
| // Convenience method to create a SimpleEntry. |
| static SimpleEntry Create(int64_t txn_id, const string& user, TxnStatePB txn_state_pb, |
| vector<std::pair<string, TxnStatePB>> participants) { |
| TxnStatusEntryPB txn_pb; |
| txn_pb.set_start_timestamp(kFakeTime); |
| txn_pb.set_last_transition_timestamp(kFakeTime); |
| txn_pb.set_state(txn_state_pb); |
| txn_pb.set_user(user); |
| vector<ParticipantIdAndPB> prt_pbs; |
| for (auto& id_and_state : participants) { |
| TxnParticipantEntryPB prt_pb; |
| prt_pb.set_state(id_and_state.second); |
| prt_pbs.emplace_back(std::make_pair(std::move(id_and_state.first), std::move(prt_pb))); |
| } |
| return { txn_id, std::move(txn_pb), std::move(prt_pbs) }; |
| } |
| |
| bool operator==(const SimpleEntry& other) const { |
| return ToString() == other.ToString(); |
| } |
| |
| friend ostream& operator<<(ostream& out, const SimpleEntry& e) { |
| out << e.ToString(); |
| return out; |
| } |
| |
| string ToString() const { |
| vector<string> prt_strs; |
| for (const auto& id_and_prt : prt_pbs) { |
| prt_strs.emplace_back(Substitute("($0, {$1})", id_and_prt.first, |
| SecureShortDebugString(id_and_prt.second))); |
| } |
| return Substitute("($0, {$1}, [$2])", txn_id, |
| SecureShortDebugString(txn_pb), JoinStrings(prt_strs, ",")); |
| } |
| }; |
| |
| class SimpleTransactionsVisitor : public TransactionsVisitor { |
| public: |
| void VisitTransactionEntries(int64_t txn_id, TxnStatusEntryPB status_entry_pb, |
| vector<ParticipantIdAndPB> participants) override { |
| entries_.emplace_back(SimpleEntry{ txn_id, std::move(status_entry_pb), |
| std::move(participants) }); |
| } |
| vector<SimpleEntry> ReleaseEntries() { |
| return std::move(entries_); |
| } |
| private: |
| vector<SimpleEntry> entries_; |
| }; |
| |
| } // anonymous namespace |
| |
| class TxnStatusTabletTest : public TabletReplicaTestBase { |
| public: |
| TxnStatusTabletTest() |
| : TabletReplicaTestBase(TxnStatusTablet::GetSchemaWithoutIds()) {} |
| |
| void SetUp() override { |
| NO_FATALS(TabletReplicaTestBase::SetUp()); |
| ConsensusBootstrapInfo info; |
| ASSERT_OK(StartReplicaAndWaitUntilLeader(info)); |
| status_tablet_.reset(new TxnStatusTablet(tablet_replica_.get())); |
| } |
| |
| protected: |
| unique_ptr<TxnStatusTablet> status_tablet_; |
| }; |
| |
| TEST_F(TxnStatusTabletTest, TestWriteTransactions) { |
| TabletServerErrorPB ts_error; |
| // We can make multiple calls to add a single transaction. This will only |
| // insert a single row to the table. |
| ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner, kFakeTime, &ts_error)); |
| ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner, kFakeTime, &ts_error)); |
| |
| // The storage abstraction doesn't prevent us from writing a new transaction |
| // entry for a lower transaction ID. |
| ASSERT_OK(status_tablet_->AddNewTransaction(5, kOwner, kFakeTime, &ts_error)); |
| ASSERT_OK(status_tablet_->AddNewTransaction(2, kOwner, kFakeTime, &ts_error)); |
| |
| // Also try updating the status of one of our transaction entries. |
| TxnStatusEntryPB status_entry_pb; |
| status_entry_pb.set_start_timestamp(kFakeTime); |
| status_entry_pb.set_last_transition_timestamp(kFakeTime); |
| status_entry_pb.set_user(kOwner); |
| status_entry_pb.set_state(TxnStatePB::ABORTED); |
| ASSERT_OK(status_tablet_->UpdateTransaction(2, status_entry_pb, &ts_error)); |
| status_entry_pb.set_state(TxnStatePB::COMMITTED); |
| ASSERT_OK(status_tablet_->UpdateTransaction(2, status_entry_pb, &ts_error)); |
| |
| // The stored entries should be sorted, de-duplicated, and have the latest |
| // values. |
| const vector<SimpleEntry> kExpectedEntries({ |
| SimpleEntry::Create(1, kOwner, TxnStatePB::OPEN, {}), |
| SimpleEntry::Create(2, kOwner, TxnStatePB::COMMITTED, {}), |
| SimpleEntry::Create(5, kOwner, TxnStatePB::OPEN, {}), |
| }); |
| |
| // Now iterate through the entries. |
| SimpleTransactionsVisitor visitor; |
| ASSERT_OK(status_tablet_->VisitTransactions(&visitor)); |
| vector<SimpleEntry> entries = visitor.ReleaseEntries(); |
| EXPECT_EQ(kExpectedEntries, entries); |
| } |
| |
| TEST_F(TxnStatusTabletTest, TestWriteParticipants) { |
| TabletServerErrorPB ts_error; |
| ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner, kFakeTime, &ts_error)); |
| |
| // Participants will be de-duplicated. |
| ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1), &ts_error)); |
| ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1), &ts_error)); |
| |
| // There aren't ordering constraints for registering participant IDs. |
| ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(5), &ts_error)); |
| ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(2), &ts_error)); |
| |
| // Try updating the status of one of our participant entries. |
| TxnParticipantEntryPB prt_entry_pb; |
| prt_entry_pb.set_state(TxnStatePB::ABORTED); |
| ASSERT_OK(status_tablet_->UpdateParticipant(1, ParticipantId(2), prt_entry_pb, &ts_error)); |
| prt_entry_pb.set_state(TxnStatePB::COMMITTED); |
| ASSERT_OK(status_tablet_->UpdateParticipant(1, ParticipantId(2), prt_entry_pb, &ts_error)); |
| |
| const vector<SimpleEntry> kExpectedEntries({ |
| SimpleEntry::Create(1, kOwner, TxnStatePB::OPEN, { |
| { ParticipantId(1), TxnStatePB::OPEN }, |
| { ParticipantId(2), TxnStatePB::COMMITTED }, |
| { ParticipantId(5), TxnStatePB::OPEN }, |
| }), |
| }); |
| SimpleTransactionsVisitor visitor; |
| ASSERT_OK(status_tablet_->VisitTransactions(&visitor)); |
| vector<SimpleEntry> entries = visitor.ReleaseEntries(); |
| EXPECT_EQ(kExpectedEntries, entries); |
| } |
| |
| // Test that a participant entry can't be visited without a corresponding |
| // status entry. |
| TEST_F(TxnStatusTabletTest, TestFailedVisitor) { |
| TabletServerErrorPB ts_error; |
| ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1), &ts_error)); |
| SimpleTransactionsVisitor visitor; |
| Status s = status_tablet_->VisitTransactions(&visitor); |
| ASSERT_TRUE(s.IsCorruption()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "missing transaction status entry"); |
| |
| // Now try again but with the transaction ID written. |
| ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner, kFakeTime, &ts_error)); |
| ASSERT_OK(status_tablet_->VisitTransactions(&visitor)); |
| |
| // And again with a new transaction ID. |
| ASSERT_OK(status_tablet_->AddNewParticipant(2, ParticipantId(2), &ts_error)); |
| s = status_tablet_->VisitTransactions(&visitor); |
| ASSERT_TRUE(s.IsCorruption()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "missing transaction status entry"); |
| } |
| |
| // Test that we can write in parallel and read in parallel from the transaction |
| // storage tablet. |
| TEST_F(TxnStatusTabletTest, TestMultithreadedAccess) { |
| const int kNumThreads = 10; |
| const int kNumParticipantsPerTransaction = 5; |
| vector<thread> threads; |
| vector<Status> statuses(kNumThreads); |
| #define RET_IF_NOT_OK(s) do { \ |
| Status _s = (s); \ |
| if (!_s.ok()) { \ |
| statuses[i] = _s; \ |
| return; \ |
| } \ |
| } while (0) |
| |
| // Start multiple threads that add a transaction and a bunch of participants, |
| // storing any errors we see. |
| for (int i = 0; i < kNumThreads; i++) { |
| threads.emplace_back([&, i] { |
| TabletServerErrorPB ts_error; |
| RET_IF_NOT_OK(status_tablet_->AddNewTransaction(i, kOwner, kFakeTime, &ts_error)); |
| for (int p = 0; p < kNumParticipantsPerTransaction; p++) { |
| RET_IF_NOT_OK(status_tablet_->AddNewParticipant(i, Substitute("prt-$0", p), &ts_error)); |
| } |
| }); |
| } |
| std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); }); |
| // There should have been no issues inserting. |
| for (const auto& s : statuses) { |
| EXPECT_OK(s); |
| } |
| threads.clear(); |
| |
| // Now try visiting the transaction status tablet from multiple threads, |
| // verifying we get back the correct number of transactions and participants. |
| for (int i = 0; i < kNumThreads; i++) { |
| threads.emplace_back([&, i] { |
| SimpleTransactionsVisitor visitor; |
| RET_IF_NOT_OK(status_tablet_->VisitTransactions(&visitor)); |
| Status s; |
| const auto entries = visitor.ReleaseEntries(); |
| if (entries.size() != kNumThreads) { |
| RET_IF_NOT_OK(Status::IllegalState(Substitute("got $0 transactions", entries.size()))); |
| } |
| for (const auto& e : entries) { |
| if (e.prt_pbs.size() != kNumParticipantsPerTransaction) { |
| RET_IF_NOT_OK(Status::IllegalState(Substitute("txn $0 had $1 participants", |
| e.txn_id, e.prt_pbs.size()))); |
| } |
| } |
| }); |
| } |
| std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); }); |
| for (const auto& s : statuses) { |
| EXPECT_OK(s); |
| } |
| } |
| |
| } // namespace transactions |
| } // namespace kudu |