// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/transactions/txn_status_manager.h"

#include <algorithm>
#include <cstdint>
#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <set>
#include <string>
#include <thread>
#include <unordered_set>
#include <utility>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet_replica-test-base.h"
#include "kudu/tablet/txn_coordinator.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/transactions/txn_status_tablet.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/barrier.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

using kudu::consensus::ConsensusBootstrapInfo;
using kudu::tablet::ParticipantIdsByTxnId;
using kudu::tablet::TabletReplicaTestBase;
using kudu::transactions::TxnStatePB;
using kudu::transactions::TxnStatusEntryPB;
using kudu::tserver::TabletServerErrorPB;
using std::string;
using std::thread;
using std::unique_ptr;
using std::unordered_set;
using std::vector;

DECLARE_uint32(txn_keepalive_interval_ms);
DECLARE_uint32(txn_staleness_tracker_interval_ms);
METRIC_DECLARE_entity(tablet);

namespace kudu {
namespace transactions {

namespace {
const char* kOwner = "gru";
const char* kParticipant = "minion";
string ParticipantId(int i) {
  return Substitute("$0$1", kParticipant, i);
}
} // anonymous namespace

class TxnStatusManagerTest : public TabletReplicaTestBase {
 public:
  TxnStatusManagerTest()
      : TabletReplicaTestBase(TxnStatusTablet::GetSchemaWithoutIds()) {}

  void SetUp() override {
    // Using shorter intervals for transaction staleness tracking to speed up
    // test scenarios verifying related functionality.
    FLAGS_txn_keepalive_interval_ms = 200;
    FLAGS_txn_staleness_tracker_interval_ms = 50;

    NO_FATALS(TabletReplicaTestBase::SetUp());
    ConsensusBootstrapInfo info;
    ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
    ASSERT_OK(ResetTxnStatusManager());
  }

  Status ResetTxnStatusManager() {
    txn_manager_.reset(new TxnStatusManager(tablet_replica_.get()));
    return txn_manager_->LoadFromTablet();
  }

 protected:
  unique_ptr<TxnStatusManager> txn_manager_;
};

// Test our ability to start transactions and register participants, with some
// corner cases.
TEST_F(TxnStatusManagerTest, TestStartTransactions) {
  const string kParticipant1 = ParticipantId(1);
  const string kParticipant2 = ParticipantId(2);
  const ParticipantIdsByTxnId expected_prts_by_txn_id = {
    { 1, {} },
    { 3, { kParticipant1, kParticipant2 } },
  };

  ASSERT_TRUE(txn_manager_->GetParticipantsByTxnIdForTests().empty());

  TabletServerErrorPB ts_error;
  for (const auto& txn_id_and_prts : expected_prts_by_txn_id) {
    const auto& txn_id = txn_id_and_prts.first;
    int64_t highest_seen_txn_id = -1;
    ASSERT_OK(txn_manager_->BeginTransaction(
        txn_id, kOwner, &highest_seen_txn_id, &ts_error));
    ASSERT_GE(highest_seen_txn_id, 0);
    ASSERT_EQ(highest_seen_txn_id, txn_id);
    for (const auto& prt : txn_id_and_prts.second) {
      ASSERT_OK(txn_manager_->RegisterParticipant(txn_id, prt, kOwner, &ts_error));
    }
  }
  // Registering a participant that's already open is harmless, presuming the
  // participant is still open.
  ASSERT_OK(txn_manager_->RegisterParticipant(3, kParticipant1, kOwner, &ts_error));

  // Starting a transaction that's already been started should result in an
  // error, even if it's not currently in flight.
  {
    int64_t highest_seen_txn_id = -1;
    auto s = txn_manager_->BeginTransaction(
        1, kOwner, &highest_seen_txn_id, &ts_error);
    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
    ASSERT_EQ(3, highest_seen_txn_id);
  }
  {
    int64_t highest_seen_txn_id = -1;
    auto s = txn_manager_->BeginTransaction(
        2, kOwner, &highest_seen_txn_id, &ts_error);
    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
    ASSERT_EQ(3, highest_seen_txn_id);
  }

  // Registering participants to transactions that don't exist should also
  // result in errors.
  auto s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner, &ts_error);
  ASSERT_TRUE(s.IsNotFound()) << s.ToString();

  // The underlying participants map should only reflect the successful
  // operations.
  ASSERT_EQ(expected_prts_by_txn_id,
            txn_manager_->GetParticipantsByTxnIdForTests());
  ASSERT_EQ(3, txn_manager_->highest_txn_id());
  {
    // Reload the TxnStatusManager from disk and verify the state.
    TxnStatusManager txn_manager_reloaded(tablet_replica_.get());
    ASSERT_OK(txn_manager_reloaded.LoadFromTablet());
    ASSERT_EQ(expected_prts_by_txn_id,
              txn_manager_reloaded.GetParticipantsByTxnIdForTests());
    ASSERT_EQ(3, txn_manager_reloaded.highest_txn_id());
  }

  // Now rebuild the underlying replica and rebuild the TxnStatusManager.
  ASSERT_OK(RestartReplica());
  NO_FATALS(ResetTxnStatusManager());
  ASSERT_EQ(expected_prts_by_txn_id,
            txn_manager_->GetParticipantsByTxnIdForTests());
  ASSERT_EQ(3, txn_manager_->highest_txn_id());

  // Verify that TxnStatusManager methods return Status::ServiceUnavailable()
  // if the transaction status tablet's data is not loaded yet.
  ASSERT_OK(RestartReplica());
  {
    TxnStatusManager tsm(tablet_replica_.get());
    // Check for the special value of the highest_txn_id when the data from
    // the transaction status tablet isn't loaded yet.
    ASSERT_EQ(-2, tsm.highest_txn_id());

    // Regardless of transaction identifiers and the records stored in the
    // transaction status tablet, all relevant methods should return
    // Status::ServiceUnavailable().
    const string kErrMsg = "transaction status data is not loaded";
    TabletServerErrorPB ts_error;
    for (int64_t txn_id : { 0, 1, 3, 4 }) {
      auto s = tsm.BeginTransaction(txn_id, kOwner, nullptr, &ts_error);
      ASSERT_TRUE(s.IsServiceUnavailable());
      ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);

      s = tsm.BeginCommitTransaction(txn_id, kOwner, &ts_error);
      ASSERT_TRUE(s.IsServiceUnavailable());
      ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);

      s = tsm.FinalizeCommitTransaction(txn_id, &ts_error);
      ASSERT_TRUE(s.IsServiceUnavailable());
      ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);

      s = tsm.AbortTransaction(txn_id, kOwner, &ts_error);
      ASSERT_TRUE(s.IsServiceUnavailable());
      ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);

      TxnStatusEntryPB txn_status;
      s = tsm.GetTransactionStatus(txn_id, kOwner, &txn_status, &ts_error);
      ASSERT_TRUE(s.IsServiceUnavailable());
      ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);

      s = tsm.KeepTransactionAlive(txn_id, kOwner, &ts_error);
      ASSERT_TRUE(s.IsServiceUnavailable());
      ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);

      s = tsm.RegisterParticipant(txn_id, kParticipant1, kOwner, &ts_error);
      ASSERT_TRUE(s.IsServiceUnavailable());
      ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
    }
  }
}

TEST_F(TxnStatusManagerTest, TestStartTransactionsConcurrently) {
  simple_spinlock lock;
  const int kParallelTxnsPerBatch = 10;
  const int kBatchesToStart = 10;
  vector<int64_t> successful_txn_ids;
  successful_txn_ids.reserve(kParallelTxnsPerBatch * kBatchesToStart);

  // Put together the batches of transaction IDs we're going to start.
  vector<vector<int64_t>> txns_to_insert;
  for (int i = 0; i < kBatchesToStart; i++) {
    vector<int64_t> txns_in_batch(kParallelTxnsPerBatch);
    std::iota(txns_in_batch.begin(), txns_in_batch.end(), i * kParallelTxnsPerBatch);
    std::mt19937 gen(SeedRandom());
    std::shuffle(txns_in_batch.begin(), txns_in_batch.end(), gen);
    txns_to_insert.emplace_back(std::move(txns_in_batch));
  }

  // From multiple threads, begin txns and record any that return with a
  // success.
  vector<thread> threads;
  vector<std::unique_ptr<Barrier>> barriers;
  threads.reserve(kParallelTxnsPerBatch);
  barriers.reserve(kBatchesToStart);
  for (int b = 0; b < kBatchesToStart; b++) {
    // NOTE: we allocate these on the heap since we disallow assignment of
    // barriers.
    barriers.emplace_back(new Barrier(kParallelTxnsPerBatch));
  }
  for (int i = 0; i < kParallelTxnsPerBatch; i++) {
    threads.emplace_back([&, i] {
      for (int b = 0; b < kBatchesToStart; b++) {
        // Synchronize the threads so we're inserting to a single range at a
        // time.
        barriers[b]->Wait();
        auto txn_id = txns_to_insert[b][i];
        TabletServerErrorPB ts_error;
        int64_t highest_seen_txn_id = -1;
        auto s = txn_manager_->BeginTransaction(
            txn_id, kOwner, &highest_seen_txn_id, &ts_error);
        if (s.ok()) {
          std::lock_guard<simple_spinlock> l(lock);
          successful_txn_ids.emplace_back(txn_id);
          CHECK_GE(highest_seen_txn_id, txn_id);
        } else {
          // In case of a failure to start a transaction, the only expected
          // failure case here is a conflict in transaction identifier. If so,
          // the assertion on the highest_see_txn_id can be made even stronger.
          CHECK_GT(highest_seen_txn_id, txn_id);
        }
      }
    });
  }
  for (auto& t : threads) {
    t.join();
  }

  // Verify that only txns that returned success ended up in the
  // TxnStatusManager
  ParticipantIdsByTxnId prts_by_txn_id = txn_manager_->GetParticipantsByTxnIdForTests();
  EXPECT_EQ(successful_txn_ids.size(), prts_by_txn_id.size());
  for (const auto& txn_id : successful_txn_ids) {
    EXPECT_TRUE(ContainsKey(prts_by_txn_id, txn_id));
  }
  // As a sanity check, there should have been at least one success per batch,
  // though there may have been multiple failures if the threads raced for the
  // highest transaction ID.
  ASSERT_GE(successful_txn_ids.size(), kBatchesToStart);
}

TEST_F(TxnStatusManagerTest, TestRegisterParticipantsConcurrently) {
  const int kParticipantsInParallel = 10;
  const int kUniqueParticipantIds = 5;
  simple_spinlock lock;
  vector<string> successful_participants;
  successful_participants.reserve(kParticipantsInParallel);

  const int64_t kTxnId = 1;
  vector<thread> threads;
  CountDownLatch begun_txn(1);
  threads.reserve(1 + kParticipantsInParallel);
  threads.emplace_back([&] {
    TabletServerErrorPB ts_error;
    CHECK_OK(txn_manager_->BeginTransaction(kTxnId, kOwner, nullptr, &ts_error));
    begun_txn.CountDown();
  });

  // Register a bunch of participants in parallel, including some duplicates,
  // keeping track of the ones that yielded success.
  for (int i = 0; i < kParticipantsInParallel; i++) {
    threads.emplace_back([&, i] {
      if (i % 2) {
        // In some threads, wait for the transaction to have begun, to ensure
        // at least some of the participant registrations succeed.
        begun_txn.Wait();
      }
      string prt = ParticipantId(i % kUniqueParticipantIds);
      TabletServerErrorPB ts_error;
      Status s = txn_manager_->RegisterParticipant(kTxnId, prt, kOwner, &ts_error);
      if (s.ok()) {
        std::lock_guard<simple_spinlock> l(lock);
        successful_participants.emplace_back(std::move(prt));
      }
    });
  }
  for (auto& t : threads) {
    t.join();
  }

  // Verify that only participant registrations that returned success ended up
  // in the TxnStatusManager.
  ParticipantIdsByTxnId prts_by_txn_id = txn_manager_->GetParticipantsByTxnIdForTests();
  ASSERT_EQ(1, prts_by_txn_id.size());
  const auto& txn_id_and_prts = *prts_by_txn_id.begin();
  ASSERT_EQ(kTxnId, txn_id_and_prts.first);

  const auto& participants = txn_id_and_prts.second;
  unordered_set<string> successful_prts(
      successful_participants.begin(), successful_participants.end());
  EXPECT_EQ(successful_prts.size(), participants.size());

  for (const auto& prt : participants) {
    EXPECT_TRUE(ContainsKey(successful_prts, prt));
  }
  ASSERT_GT(successful_prts.size(), 0);
}

TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
  const int kNumTransactions = 10;
  const int kNumUpdatesInParallel = 20;
  for (int i = 0; i < kNumTransactions; i++) {
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->BeginTransaction(i, kOwner, nullptr, &ts_error));
  }
  typedef std::pair<int64_t, TxnStatePB> IdAndUpdate;
  vector<IdAndUpdate> all_updates;
  for (int i = 0; i < kNumTransactions; i++) {
    all_updates.emplace_back(std::make_pair(i, TxnStatePB::ABORTED));
    all_updates.emplace_back(std::make_pair(i, TxnStatePB::COMMIT_IN_PROGRESS));
    all_updates.emplace_back(std::make_pair(i, TxnStatePB::COMMITTED));
  }
  ThreadSafeRandom rng(SeedRandom());
  vector<IdAndUpdate> updates;
  ReservoirSample(all_updates, kNumUpdatesInParallel, std::set<IdAndUpdate>(), &rng, &updates);
  vector<Status> statuses(kNumUpdatesInParallel);
  vector<thread> threads;
  threads.reserve(kNumUpdatesInParallel);
  // Start a bunch of threads that update transaction states.
  for (int i = 0; i < kNumUpdatesInParallel; i++) {
    threads.emplace_back([&, i] {
      const auto& txn_id = updates[i].first;
      TabletServerErrorPB ts_error;
      switch (updates[i].second) {
        case TxnStatePB::ABORTED:
          statuses[i] = txn_manager_->AbortTransaction(txn_id, kOwner, &ts_error);
          break;
        case TxnStatePB::COMMIT_IN_PROGRESS:
          statuses[i] = txn_manager_->BeginCommitTransaction(txn_id, kOwner, &ts_error);
          break;
        case TxnStatePB::COMMITTED:
          statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id, &ts_error);
          break;
        default:
          FAIL() << "bad update";
      }
    });
  }
  for (auto& t : threads) {
    t.join();
  }

  // Collect the transaction IDs per successful update.
  unordered_set<int64_t> txns_with_abort;
  unordered_set<int64_t> txns_with_begin_commit;
  unordered_set<int64_t> txns_with_finalize_commit;
  for (int i = 0; i < kNumUpdatesInParallel; i++) {
    const auto& txn_id = updates[i].first;
    if (!statuses[i].ok()) {
      continue;
    }
    switch (updates[i].second) {
      case TxnStatePB::ABORTED:
        EmplaceIfNotPresent(&txns_with_abort, txn_id);
        break;
      case TxnStatePB::COMMIT_IN_PROGRESS:
        EmplaceIfNotPresent(&txns_with_begin_commit, txn_id);
        break;
      case TxnStatePB::COMMITTED:
        EmplaceIfNotPresent(&txns_with_finalize_commit, txn_id);
        break;
      default:
        FAIL() << "bad update";
    }
  }
  for (int i = 0; i < kNumTransactions; i++) {
    // If there's a finalize commit and an abort commit, only one can succeed.
    if (ContainsKey(txns_with_abort, i)) {
      ASSERT_FALSE(ContainsKey(txns_with_finalize_commit, i));
    }
    // If there's a finalize commit, it can only succeed if there's also been a
    // successful request to begin the commit.
    if (ContainsKey(txns_with_finalize_commit, i)) {
      ASSERT_TRUE(ContainsKey(txns_with_begin_commit, i));
      ASSERT_FALSE(ContainsKey(txns_with_abort, i));
    }
  }
}

// This test scenario verifies basic functionality of the
// TxnStatusManager::GetTransactionStatus() method.
TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
  {
    TxnStatusEntryPB txn_status;
    TabletServerErrorPB ts_error;
    auto s = txn_manager_->GetTransactionStatus(
        1, kOwner, &txn_status, &ts_error);
    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
    ASSERT_FALSE(txn_status.has_state());
    ASSERT_FALSE(txn_status.has_user());
  }
  {
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));

    TxnStatusEntryPB txn_status;
    ASSERT_OK(txn_manager_->GetTransactionStatus(
        1, kOwner, &txn_status, &ts_error));
    ASSERT_TRUE(txn_status.has_state());
    ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
    ASSERT_TRUE(txn_status.has_user());
    ASSERT_EQ(kOwner, txn_status.user());

    ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
    ASSERT_OK(txn_manager_->GetTransactionStatus(
        1, kOwner, &txn_status, &ts_error));
    ASSERT_TRUE(txn_status.has_state());
    ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
    ASSERT_TRUE(txn_status.has_user());
    ASSERT_EQ(kOwner, txn_status.user());

    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, &ts_error));
    ASSERT_OK(txn_manager_->GetTransactionStatus(
        1, kOwner, &txn_status, &ts_error));
    ASSERT_TRUE(txn_status.has_state());
    ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
    ASSERT_TRUE(txn_status.has_user());
    ASSERT_EQ(kOwner, txn_status.user());
  }

  {
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
    ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));

    TxnStatusEntryPB txn_status;
    ASSERT_OK(txn_manager_->GetTransactionStatus(
        2, kOwner, &txn_status, &ts_error));
    ASSERT_TRUE(txn_status.has_state());
    ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
    ASSERT_TRUE(txn_status.has_user());
    ASSERT_EQ(kOwner, txn_status.user());
  }

  // Start another transaction and start its commit phase.
  TabletServerErrorPB ts_error;
  ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
  ASSERT_OK(txn_manager_->BeginCommitTransaction(3, kOwner, &ts_error));

  // Start just another transaction.
  ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));

  // Make the TxnStatusManager start from scratch.
  ASSERT_OK(RestartReplica());
  NO_FATALS(ResetTxnStatusManager());

  // Committed, aborted, and in-flight transactions should be known to the
  // TxnStatusManager even after restarting the underlying replica and
  // rebuilding the TxnStatusManager from scratch.
  {
    TxnStatusEntryPB txn_status;
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->GetTransactionStatus(
        1, kOwner, &txn_status, &ts_error));
    ASSERT_TRUE(txn_status.has_state());
    ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
    ASSERT_TRUE(txn_status.has_user());
    ASSERT_EQ(kOwner, txn_status.user());

    ASSERT_OK(txn_manager_->GetTransactionStatus(
        2, kOwner, &txn_status, &ts_error));
    ASSERT_TRUE(txn_status.has_state());
    ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
    ASSERT_TRUE(txn_status.has_user());
    ASSERT_EQ(kOwner, txn_status.user());

    ASSERT_OK(txn_manager_->GetTransactionStatus(
        3, kOwner, &txn_status, &ts_error));
    ASSERT_TRUE(txn_status.has_state());
    ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
    ASSERT_TRUE(txn_status.has_user());
    ASSERT_EQ(kOwner, txn_status.user());

    ASSERT_OK(txn_manager_->GetTransactionStatus(
        4, kOwner, &txn_status, &ts_error));
    ASSERT_TRUE(txn_status.has_state());
    ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
    ASSERT_TRUE(txn_status.has_user());
    ASSERT_EQ(kOwner, txn_status.user());
  }

  // Supplying wrong user.
  {
    TxnStatusEntryPB txn_status;
    TabletServerErrorPB ts_error;
    auto s = txn_manager_->GetTransactionStatus(
        1, "stranger", &txn_status, &ts_error);
    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
  }

  // Supplying not-yet-used transaction ID.
  {
    TxnStatusEntryPB txn_status;
    TabletServerErrorPB ts_error;
    auto s = txn_manager_->GetTransactionStatus(
        0, kOwner, &txn_status, &ts_error);
    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
  }

  // Supplying wrong user and not-yet-used transaction ID.
  {
    TxnStatusEntryPB txn_status;
    TabletServerErrorPB ts_error;
    auto s = txn_manager_->GetTransactionStatus(
        0, "stranger", &txn_status, &ts_error);
    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
  }
}

// This test scenario verifies basic functionality of the
// TxnStatusManager::KeepTransactionAlive() method w.r.t. state of the
// transaction.
TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
  // Supplying not-yet-registered transaction ID.
  {
    TabletServerErrorPB ts_error;
    auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
  }

  // OPEN --> COMMIT_IN_PROGRESS --> COMMITTED
  {
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
    ASSERT_OK(txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error));
    // Supplying wrong user for transaction in OPEN state.
    {
      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
      ASSERT_STR_CONTAINS(s.ToString(),
                          "transaction ID 1 not owned by stranger");
    }

    ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
    auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(),
                        "transaction ID 1 is in commit phase");
    // Supplying wrong user for transaction in COMMIT_IN_PROGRESS state.
    {
      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
      ASSERT_STR_CONTAINS(s.ToString(),
                          "transaction ID 1 not owned by stranger");
    }

    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, &ts_error));
    s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(),
                        "transaction ID 1 is already in terminal state");
    // Supplying wrong user for transaction in COMMITTED state.
    {
      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
      ASSERT_STR_CONTAINS(s.ToString(),
                          "transaction ID 1 not owned by stranger");
    }
  }

  // OPEN --> COMMIT_IN_PROGRESS --> ABORTED
  {
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
    ASSERT_OK(txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error));

    ASSERT_OK(txn_manager_->BeginCommitTransaction(2, kOwner, &ts_error));
    auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(),
                        "transaction ID 2 is in commit phase");

    ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
    s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(),
                        "transaction ID 2 is already in terminal state");
    // Supplying wrong user for transaction in ABORTED state.
    {
      auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
      ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
    }
  }

  // OPEN --> ABORTED
  {
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
    ASSERT_OK(txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error));

    ASSERT_OK(txn_manager_->AbortTransaction(3, kOwner, &ts_error));
    auto s = txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error);
    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(),
                        "transaction ID 3 is already in terminal state");
  }

  // Open a new transaction just before restarting the TxnStatusManager.
  {
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
  }

  // Make the TxnStatusManager start from scratch.
  ASSERT_OK(RestartReplica());
  NO_FATALS(ResetTxnStatusManager());

  // Committed, aborted, and in-flight transactions should be known to the
  // TxnStatusManager even after restarting the underlying replica and
  // rebuilding the TxnStatusManager from scratch, so KeepTransactionAlive()
  // should behave the same as if no restart has happened.
  {
    TabletServerErrorPB ts_error;
    auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(),
                        "transaction ID 1 is already in terminal state");
    // Supplying wrong user for transaction in COMMITTED state.
    {
      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
      ASSERT_STR_CONTAINS(s.ToString(),
                          "transaction ID 1 not owned by stranger");
    }
  }
  {
    TabletServerErrorPB ts_error;
    auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
    ASSERT_STR_CONTAINS(s.ToString(),
                        "transaction ID 2 is already in terminal state");
    // Supplying wrong user for transaction in ABORTED state.
    {
      auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
      ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
    }
  }
  {
    TabletServerErrorPB ts_error;
    ASSERT_OK(txn_manager_->KeepTransactionAlive(4, kOwner, &ts_error));
    // Supplying wrong user for transaction in OPEN state.
    {
      auto s = txn_manager_->KeepTransactionAlive(4, "stranger", &ts_error);
      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
      ASSERT_STR_CONTAINS(s.ToString(),
                          "transaction ID 4 not owned by stranger");
    }
  }
}

// Test that performing actions as the wrong user will return errors.
TEST_F(TxnStatusManagerTest, TestWrongUser) {
  const string kWrongUser = "stranger";
  int64_t highest_seen_txn_id = -1;
  TabletServerErrorPB ts_error;
  ASSERT_OK(txn_manager_->BeginTransaction(
      1, kOwner, &highest_seen_txn_id, &ts_error));
  ASSERT_EQ(1, highest_seen_txn_id);
  ASSERT_OK(txn_manager_->RegisterParticipant(1, ParticipantId(1), kOwner, &ts_error));

  // First, any other call to begin the transaction should be rejected,
  // regardless of user.
  highest_seen_txn_id = -1;
  Status s = txn_manager_->BeginTransaction(
      1, kWrongUser, &highest_seen_txn_id, &ts_error);
  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
  ASSERT_EQ(1, highest_seen_txn_id);

  // All actions should be rejected if performed by the wrong user.
  s = txn_manager_->RegisterParticipant(1, ParticipantId(1), kWrongUser, &ts_error);
  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
  s = txn_manager_->RegisterParticipant(1, ParticipantId(2), kWrongUser, &ts_error);
  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
  s = txn_manager_->BeginCommitTransaction(1, kWrongUser, &ts_error);
  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
  s = txn_manager_->AbortTransaction(1, kWrongUser, &ts_error);
  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
  ParticipantIdsByTxnId prts_by_txn_id = txn_manager_->GetParticipantsByTxnIdForTests();
  ParticipantIdsByTxnId kExpectedPrtsByTxnId = { { 1, { ParticipantId(1) } } };
  ASSERT_EQ(kExpectedPrtsByTxnId, prts_by_txn_id);
}

// Test that we can only update a transaction's state when it's in an
// appropriate state.
TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
  const int64_t kTxnId1 = 1;
  TabletServerErrorPB ts_error;
  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, nullptr, &ts_error));

  // Redundant calls are benign.
  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error));
  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error));
  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId1, kOwner, &ts_error));
  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId1, kOwner, &ts_error));

  // We can't begin or finalize a commit if we've aborted.
  Status s = txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error);
  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
  s = txn_manager_->FinalizeCommitTransaction(kTxnId1, &ts_error);
  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();

  // We can't finalize a commit that hasn't begun committing.
  const int64_t kTxnId2 = 2;
  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, &ts_error));
  s = txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error);
  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();

  // We can't abort a transaction that has finished committing.
  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error));
  s = txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error);
  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();

  // Redundant finalize calls are also benign.
  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error));

  // Calls to begin committing should return an error if we've already
  // finalized the commit.
  s = txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error);
  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
}

// Test that we can only add participants to a transaction when it's in an
// appropriate state.
TEST_F(TxnStatusManagerTest, TestRegisterParticipantsWithStates) {
  TabletServerErrorPB ts_error;
  const int64_t kTxnId1 = 1;

  // We can't register a participant to a transaction that hasn't started.
  Status s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner, &ts_error);
  ASSERT_TRUE(s.IsNotFound()) << s.ToString();

  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, nullptr, &ts_error));
  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner, &ts_error));

  // Registering the same participant is idempotent and benign.
  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner, &ts_error));

  // We can't register participants when we've already begun committing.
  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error));
  s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();

  // We can't register participants when we've finished committnig.
  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId1, &ts_error));
  s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();

  // We can't register participants when we've aborted the transaction.
  const int64_t kTxnId2 = 2;
  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, &ts_error));
  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(1), kOwner, &ts_error));
  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error));
  s = txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(2), kOwner, &ts_error);
  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
}

} // namespace transactions
} // namespace kudu

