blob: 44ded0ee0fadb337d44709603f7f846a9dfe4469 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/txn_participant.h"
#include <algorithm>
#include <cstdint>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/op_driver.h"
#include "kudu/tablet/ops/op_tracker.h"
#include "kudu/tablet/ops/participant_op.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica-test-base.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_participant-test-util.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
using kudu::consensus::CommitMsg;
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::pb_util::SecureShortDebugString;
using kudu::tserver::ParticipantRequestPB;
using kudu::tserver::ParticipantResponsePB;
using kudu::tserver::ParticipantOpPB;
using kudu::tserver::WriteRequestPB;
using std::map;
using std::thread;
using std::unique_ptr;
using std::vector;
DECLARE_bool(enable_maintenance_manager);
DECLARE_bool(log_preallocate_segments);
DECLARE_bool(log_async_preallocate_segments);
namespace kudu {
namespace tablet {
namespace {
Schema GetTestSchema() {
return Schema({ ColumnSchema("key", INT32) }, 1);
}
// A participant op that waits to start and finish applying based on input
// latches.
class DelayedParticipantOp : public ParticipantOp {
public:
DelayedParticipantOp(CountDownLatch* apply_started,
CountDownLatch* apply_continue,
unique_ptr<ParticipantOpState> state)
: ParticipantOp(std::move(state), consensus::LEADER),
apply_started_(apply_started),
apply_continue_(apply_continue) {}
Status Apply(CommitMsg** commit_msg) override {
apply_started_->CountDown();
LOG(INFO) << "Delaying apply...";
apply_continue_->Wait();
return ParticipantOp::Apply(commit_msg);
}
private:
CountDownLatch* apply_started_;
CountDownLatch* apply_continue_;
};
} // anonymous namespace
class TxnParticipantTest : public TabletReplicaTestBase {
public:
TxnParticipantTest()
: TabletReplicaTestBase(GetTestSchema()) {}
void SetUp() override {
// Some of these tests will test the durability semantics of participants.
// So we have finer-grained control of on-disk state, disable anything that
// might write to disk in the background.
FLAGS_enable_maintenance_manager = false;
FLAGS_log_preallocate_segments = false;
FLAGS_log_async_preallocate_segments = false;
NO_FATALS(TabletReplicaTestBase::SetUp());
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
}
Status Write(int key) {
WriteRequestPB req;
req.set_tablet_id(tablet_replica_->tablet_id());
const auto& schema = GetTestSchema();
RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
KuduPartialRow row(&schema);
RETURN_NOT_OK(row.SetInt32(0, key));
RowOperationsPBEncoder enc(req.mutable_row_operations());
enc.Add(RowOperationsPB::INSERT, row);
return ExecuteWrite(tablet_replica_.get(), req);
}
// Writes an op to the WAL, rolls over onto a new WAL segment, and flushes
// the MRS, leaving us with a new WAL segment that should be GC-able unless
// previous WAL segments are anchored.
Status WriteRolloverAndFlush(int* current_key) {
RETURN_NOT_OK(Write(*current_key++));
RETURN_NOT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
RETURN_NOT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
return tablet_replica_->tablet()->Flush();
}
TxnParticipant* txn_participant() {
return tablet_replica_->tablet()->txn_participant();
}
};
TEST_F(TxnParticipantTest, TestSuccessfulSequences) {
const auto check_valid_sequence = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
int64_t txn_id) {
for (const auto& type : ops) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
SCOPED_TRACE(SecureShortDebugString(resp));
ASSERT_FALSE(resp.has_error());
ASSERT_TRUE(resp.has_timestamp());
}
};
// Check the happy path where the transaction is committed.
NO_FATALS(check_valid_sequence({
ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::FINALIZE_COMMIT,
}, 0));
// Check the case where a transaction is aborted after beginning to commit.
NO_FATALS(check_valid_sequence({
ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::ABORT_TXN,
}, 1));
// Check the case where a transaction is aborted after starting but before
// committing.
NO_FATALS(check_valid_sequence({
ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::ABORT_TXN,
}, 2));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ 0, Txn::kCommitted, kDummyCommitTimestamp },
{ 1, Txn::kAborted, -1 },
{ 2, Txn::kAborted, -1 },
}), txn_participant()->GetTxnsForTests());
}
TEST_F(TxnParticipantTest, TestTransactionNotFound) {
const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
int64_t txn_id) {
for (const auto& type : ops) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
SCOPED_TRACE(SecureShortDebugString(resp));
ASSERT_TRUE(resp.has_error());
ASSERT_TRUE(resp.error().has_status());
ASSERT_EQ(AppStatusPB::NOT_FOUND, resp.error().status().code());
ASSERT_FALSE(resp.has_timestamp());
}
};
NO_FATALS(check_bad_ops({
ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::FINALIZE_COMMIT,
ParticipantOpPB::ABORT_TXN,
}, 1));
ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
}
TEST_F(TxnParticipantTest, TestIllegalTransitions) {
const int64_t kTxnId = 1;
const auto check_valid_op = [&] (const ParticipantOpPB::ParticipantOpType& type, int64_t txn_id) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
SCOPED_TRACE(SecureShortDebugString(resp));
ASSERT_FALSE(resp.has_error());
ASSERT_TRUE(resp.has_timestamp());
};
const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
int64_t txn_id) {
for (const auto& type : ops) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
SCOPED_TRACE(SecureShortDebugString(resp));
ASSERT_TRUE(resp.has_error());
ASSERT_TRUE(resp.error().has_status());
ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code());
ASSERT_FALSE(resp.has_timestamp());
}
};
// Once we've begun the transaction, we can't finalize without beginning to
// commit.
NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kTxnId));
NO_FATALS(check_bad_ops({ ParticipantOpPB::FINALIZE_COMMIT }, kTxnId));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kOpen, -1 },
}), txn_participant()->GetTxnsForTests());
// Once we begin committing, we can't start the transaction again.
NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_COMMIT, kTxnId));
NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN }, kTxnId));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitInProgress, -1 },
}), txn_participant()->GetTxnsForTests());
// Once we've begun finalizing, we can't do anything.
NO_FATALS(check_valid_op(ParticipantOpPB::FINALIZE_COMMIT, kTxnId));
NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::ABORT_TXN }, kTxnId));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
}), txn_participant()->GetTxnsForTests());
// Once we've aborted, we can't do anything.
const int64_t kAbortedTxnId = 2;
NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kAbortedTxnId));
NO_FATALS(check_valid_op(ParticipantOpPB::ABORT_TXN, kAbortedTxnId));
NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::FINALIZE_COMMIT }, kAbortedTxnId));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
{ kAbortedTxnId, Txn::kAborted, -1 },
}), txn_participant()->GetTxnsForTests());
}
// Test that we have no trouble operating on separate transactions.
TEST_F(TxnParticipantTest, TestConcurrentTransactions) {
const int kNumTxns = 10;
vector<thread> threads;
Status statuses[kNumTxns];
for (int i = 0; i < kNumTxns; i++) {
threads.emplace_back([&, i] {
for (const auto& type : kCommitSequence) {
ParticipantResponsePB resp;
Status s = CallParticipantOp(
tablet_replica_.get(), i, type, kDummyCommitTimestamp, &resp);
if (s.ok() && resp.has_error()) {
s = StatusFromPB(resp.error().status());
}
statuses[i] = s;
}
});
}
std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); });
for (const auto& s : statuses) {
EXPECT_OK(s);
}
const auto& txns = txn_participant()->GetTxnsForTests();
for (int i = 0; i < kNumTxns; i++) {
ASSERT_EQ(TxnParticipant::TxnEntry({ i, Txn::kCommitted, kDummyCommitTimestamp }), txns[i]);
}
}
// Concurrently try to apply every op and test, based on the results, that some
// invariants are maintained.
TEST_F(TxnParticipantTest, TestConcurrentOps) {
const int64_t kTxnId = 1;
const map<ParticipantOpPB::ParticipantOpType, int> kIndexByOps = {
{ ParticipantOpPB::BEGIN_TXN, 0 },
{ ParticipantOpPB::BEGIN_COMMIT, 1},
{ ParticipantOpPB::FINALIZE_COMMIT, 2},
{ ParticipantOpPB::ABORT_TXN, 3},
};
vector<thread> threads;
vector<Status> statuses(kIndexByOps.size(), Status::Incomplete(""));
for (const auto& op_and_idx : kIndexByOps) {
const auto& op_type = op_and_idx.first;
const auto& idx = op_and_idx.second;
threads.emplace_back([&, op_type, idx] {
ParticipantResponsePB resp;
Status s = CallParticipantOp(
tablet_replica_.get(), kTxnId, op_type, kDummyCommitTimestamp, &resp);
if (s.ok() && resp.has_error()) {
s = StatusFromPB(resp.error().status());
}
statuses[idx] = s;
});
}
std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); });
const auto status_for_op = [&] (ParticipantOpPB::ParticipantOpType type) {
return statuses[FindOrDie(kIndexByOps, type)];
};
// Regardless of order, we should have been able to begin the transaction.
ASSERT_OK(status_for_op(ParticipantOpPB::BEGIN_TXN));
// If we finalized the commit, we should have begun committing, and we must
// not have been able to abort.
if (status_for_op(ParticipantOpPB::FINALIZE_COMMIT).ok()) {
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
}), txn_participant()->GetTxnsForTests());
ASSERT_OK(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::BEGIN_COMMIT)]);
ASSERT_FALSE(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::ABORT_TXN)].ok());
// If we aborted the commit, we could not have finalized the commit.
} else if (status_for_op(ParticipantOpPB::ABORT_TXN).ok()) {
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kAborted, -1 },
}), txn_participant()->GetTxnsForTests());
ASSERT_FALSE(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::FINALIZE_COMMIT)].ok());
// If we neither aborted nor finalized, but we began to commit, we should be
// left with the commit in progress.
} else if (status_for_op(ParticipantOpPB::BEGIN_COMMIT).ok()) {
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitInProgress, -1 },
}), txn_participant()->GetTxnsForTests());
// Finally, if nothing else succeeded, at least we should have been able to
// start the transaction.
} else {
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kOpen, -1 },
}), txn_participant()->GetTxnsForTests());
}
}
TEST_F(TxnParticipantTest, TestReplayParticipantOps) {
constexpr const int64_t kTxnId = 1;
for (const auto& type : kCommitSequence) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
tablet_replica_.get(), kTxnId, type, kDummyCommitTimestamp, &resp));
SCOPED_TRACE(SecureShortDebugString(resp));
ASSERT_FALSE(resp.has_error());
ASSERT_TRUE(resp.has_timestamp());
}
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
}), txn_participant()->GetTxnsForTests());
ASSERT_OK(RestartReplica());
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
}), txn_participant()->GetTxnsForTests());
}
// Test that each transaction has a single anchor that gets updated as
// participant ops land.
TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) {
int64_t expected_index = 1;
// Validates that each op in the given sequence updates the single anchor
// maintained for the transaction.
const auto check_participant_ops_are_anchored =
[&] (int64_t txn_id, const vector<ParticipantOpPB::ParticipantOpType>& ops) {
for (const auto& op : ops) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op,
kDummyCommitTimestamp, &resp));
ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
int64_t log_index = -1;
tablet_replica_->log_anchor_registry()->GetEarliestRegisteredLogIndex(&log_index);
ASSERT_EQ(++expected_index, log_index);
}
ASSERT_TRUE(txn_participant()->ClearIfCompleteForTests(txn_id));
ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
};
NO_FATALS(check_participant_ops_are_anchored(1, {
ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::FINALIZE_COMMIT
}));
NO_FATALS(check_participant_ops_are_anchored(2, {
ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::ABORT_TXN
}));
}
// Test that participant ops are anchored, the anchors are updated as a
// transaction's state gets updated.
TEST_F(TxnParticipantTest, TestParticipantOpsAnchorWALs) {
const int64_t kTxnId = 1;
// First, perform some initial participant ops and roll the WAL segments so
// there are some candidates for WAL GC.
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN,
kDummyCommitTimestamp, &resp));
ASSERT_FALSE(resp.has_error());
ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
// Write and flush some ops that would otherwise lead to GC-able WAL
// segments. Since there is an anchored participant op in the WAL before
// these writes, the tablet should not be GC-able.
int current_key = 0;
ASSERT_OK(WriteRolloverAndFlush(&current_key));
ASSERT_OK(WriteRolloverAndFlush(&current_key));
int64_t gcable_size;
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_EQ(0, gcable_size);
// WAL GC should proceed to clear out ops for both the transaction and the
// inserts.
ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT,
kDummyCommitTimestamp, &resp));
ASSERT_FALSE(resp.has_error());
ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::FINALIZE_COMMIT,
kDummyCommitTimestamp, &resp));
ASSERT_FALSE(resp.has_error());
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_GT(gcable_size, 0);
ASSERT_OK(tablet_replica_->RunLogGC());
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_EQ(0, gcable_size);
// Ensure the transaction bootstraps to the expected state.
// NOTE: we need to reset the tablet here to reset the TxnParticipant.
// Otherwise, we might start the replica with a LogAnchor already registered.
ASSERT_OK(RestartReplica(/*reset_tablet*/true));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
}), txn_participant()->GetTxnsForTests());
// Roll onto new WAL segments and add more segments so we can get to a state
// without any transaction ops in the WALs.
ASSERT_OK(WriteRolloverAndFlush(&current_key));
ASSERT_OK(WriteRolloverAndFlush(&current_key));
// While the transaction still exists, we shouldn't GC anything.
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_EQ(0, gcable_size);
// Once we cull the transaction state in memory, we should be left with no
// trace of the transaction.
ASSERT_TRUE(txn_participant()->ClearIfCompleteForTests(kTxnId));
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_GT(gcable_size, 0);
ASSERT_OK(tablet_replica_->RunLogGC());
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_EQ(0, gcable_size);
// Do a final check that we bootstrap to the expected state (i.e. the
// transaction is culled).
ASSERT_OK(RestartReplica(/*reset_tablet*/true));
ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
}
// Similar to the above test, but checking that in-flight ops anchor the WALs.
TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) {
const int64_t kTxnId = 1;
ParticipantRequestPB req;
ParticipantResponsePB resp;
auto op_state = NewParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN,
kDummyCommitTimestamp, &req, &resp);
CountDownLatch latch(1);
CountDownLatch apply_start(1);
CountDownLatch apply_continue(1);
op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, &resp)));
scoped_refptr<OpDriver> driver;
unique_ptr<DelayedParticipantOp> op(
new DelayedParticipantOp(&apply_start, &apply_continue, std::move(op_state)));
ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver));
ASSERT_OK(driver->ExecuteAsync());
// Wait for the apply to start, indicating that we have persisted and
// replicated but not yet Raft committed the participant op.
apply_start.Wait();
ASSERT_TRUE(driver->GetOpId().IsInitialized());
ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests());
// Create some WAL segments to ensure some would-be-GC-able segments.
int current_key = 0;
ASSERT_OK(WriteRolloverAndFlush(&current_key));
ASSERT_OK(WriteRolloverAndFlush(&current_key));
// Our participant op is still pending, and nothing should be GC-able.
ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests());
int64_t gcable_size;
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_EQ(0, gcable_size);
// Finish applying the participant op and proceed to completion.
apply_continue.CountDown();
latch.Wait();
// Even though we've completed the op, the replicate message should still be
// anchored while the in-memory transaction state exists on this participant.
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_EQ(0, gcable_size);
ASSERT_OK(WriteRolloverAndFlush(&current_key));
ASSERT_OK(WriteRolloverAndFlush(&current_key));
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_EQ(0, gcable_size);
// The moment we update the in-memory state, we should be able to GC.
ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT,
kDummyCommitTimestamp, &resp));
ASSERT_FALSE(resp.has_error());
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_GT(gcable_size, 0);
// As a sanity check, ensure we get to the expected state if we reboot.
ASSERT_OK(RestartReplica(/*reset_tablet*/true));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitInProgress, -1 }
}), txn_participant()->GetTxnsForTests());
}
} // namespace tablet
} // namespace kudu