blob: 31d922c471875bec322295db6335d3482bdd3e71 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/mvcc.h"
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <thread>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/clock/hybrid_clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/timestamp.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/txn_metadata.h"
#include "kudu/util/barrier.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using std::thread;
using std::unique_ptr;
using std::vector;
METRIC_DECLARE_entity(server);
namespace kudu {
namespace tablet {
class MvccTest : public KuduTest {
public:
MvccTest()
: clock_(Timestamp::kInitialTimestamp) {
}
void WaitForSnapshotAtTSThread(MvccManager* mgr, Timestamp ts) {
MvccSnapshot s;
CHECK_OK(mgr->WaitForSnapshotWithAllApplied(ts, &s, MonoTime::Max()));
CHECK(s.is_clean()) << "verifying postcondition";
std::lock_guard<simple_spinlock> lock(lock_);
result_snapshot_.reset(new MvccSnapshot(s));
}
bool HasResultSnapshot() {
std::lock_guard<simple_spinlock> lock(lock_);
return result_snapshot_ != nullptr;
}
protected:
clock::LogicalClock clock_;
simple_spinlock lock_;
unique_ptr<MvccSnapshot> result_snapshot_;
};
TEST_F(MvccTest, TestMvccBasic) {
MvccManager mgr;
MvccSnapshot snap;
// Initial state should not have any applied ops.
snap = MvccSnapshot(mgr);
ASSERT_EQ("MvccSnapshot[applied={T|T < 1}]", snap.ToString());
ASSERT_FALSE(snap.IsApplied(Timestamp(1)));
ASSERT_FALSE(snap.IsApplied(Timestamp(2)));
// Start timestamp 1
Timestamp t = clock_.Now();
ASSERT_EQ(1, t.value());
ScopedOp op(&mgr, t);
// State should still have no applied ops, since 1 is in-flight.
snap = MvccSnapshot(mgr);
ASSERT_EQ("MvccSnapshot[applied={T|T < 1}]", snap.ToString());
ASSERT_FALSE(snap.IsApplied(Timestamp(1)));
ASSERT_FALSE(snap.IsApplied(Timestamp(2)));
// Mark timestamp 1 as "applying"
op.StartApplying();
// This should not change the set of applied ops.
ASSERT_FALSE(snap.IsApplied(Timestamp(1)));
// Apply timestamp 1
op.FinishApplying();
// State should show 0 as applied, 1 as nonapplied.
snap = MvccSnapshot(mgr);
ASSERT_EQ("MvccSnapshot[applied={T|T < 1 or (T in {1})}]", snap.ToString());
ASSERT_TRUE(snap.IsApplied(Timestamp(1)));
ASSERT_FALSE(snap.IsApplied(Timestamp(2)));
}
TEST_F(MvccTest, TestMvccMultipleInFlight) {
MvccManager mgr;
MvccSnapshot snap;
Timestamp t1 = clock_.Now();
ASSERT_EQ(1, t1.value());
ScopedOp op1(&mgr, t1);
Timestamp t2 = clock_.Now();
ASSERT_EQ(2, t2.value());
ScopedOp op2(&mgr, t2);
// State should still have no applied ops, since both are in-flight.
snap = MvccSnapshot(mgr);
ASSERT_EQ("MvccSnapshot[applied={T|T < 1}]", snap.ToString());
ASSERT_FALSE(snap.IsApplied(t1));
ASSERT_FALSE(snap.IsApplied(t2));
// Apply timestamp 2
op2.StartApplying();
op2.FinishApplying();
// State should show 2 as applied, 1 as nonapplied.
snap = MvccSnapshot(mgr);
ASSERT_EQ("MvccSnapshot[applied="
"{T|T < 1 or (T in {2})}]",
snap.ToString());
ASSERT_FALSE(snap.IsApplied(t1));
ASSERT_TRUE(snap.IsApplied(t2));
// Start another ops. This gets timestamp 3
Timestamp t3 = clock_.Now();
ASSERT_EQ(3, t3.value());
ScopedOp op3(&mgr, t3);
// State should show 2 as applied, 1 and 4 as nonapplied.
snap = MvccSnapshot(mgr);
ASSERT_EQ("MvccSnapshot[applied="
"{T|T < 1 or (T in {2})}]",
snap.ToString());
ASSERT_FALSE(snap.IsApplied(t1));
ASSERT_TRUE(snap.IsApplied(t2));
ASSERT_FALSE(snap.IsApplied(t3));
// Apply 3
op3.StartApplying();
op3.FinishApplying();
// 2 and 3 applied
snap = MvccSnapshot(mgr);
ASSERT_EQ("MvccSnapshot[applied="
"{T|T < 1 or (T in {2,3})}]",
snap.ToString());
ASSERT_FALSE(snap.IsApplied(t1));
ASSERT_TRUE(snap.IsApplied(t2));
ASSERT_TRUE(snap.IsApplied(t3));
// Apply 1
op1.StartApplying();
op1.FinishApplying();
// All ops are applied, adjust the new op lower bound.
mgr.AdjustNewOpLowerBound(t3);
// all applied
snap = MvccSnapshot(mgr);
ASSERT_EQ("MvccSnapshot[applied={T|T < 3 or (T in {3})}]", snap.ToString());
ASSERT_TRUE(snap.IsApplied(t1));
ASSERT_TRUE(snap.IsApplied(t2));
ASSERT_TRUE(snap.IsApplied(t3));
}
TEST_F(MvccTest, TestOutOfOrderOps) {
MetricRegistry metric_registry;
auto metric_entity(METRIC_ENTITY_server.Instantiate(&metric_registry, "mvcc-test"));
clock::HybridClock hybrid_clock(metric_entity);
ASSERT_OK(hybrid_clock.Init());
MvccManager mgr;
// Start a normal non-commit-wait op.
Timestamp first_ts = hybrid_clock.Now();
ScopedOp first_op(&mgr, first_ts);
// Take a snapshot that con
MvccSnapshot snap_with_nothing_applied(mgr);
// Start an op as if it were using commit-wait (i.e. started in future)
Timestamp cw_ts = hybrid_clock.NowLatest();
ScopedOp cw_op(&mgr, cw_ts);
// Apply the original op
first_op.StartApplying();
first_op.FinishApplying();
// Start a new op
Timestamp second_ts = hybrid_clock.Now();
ScopedOp second_op(&mgr, second_ts);
// The old snapshot should not have either op
EXPECT_FALSE(snap_with_nothing_applied.IsApplied(first_ts));
EXPECT_FALSE(snap_with_nothing_applied.IsApplied(second_ts));
// A new snapshot should have only the first op
MvccSnapshot snap_with_first_applied(mgr);
EXPECT_TRUE(snap_with_first_applied.IsApplied(first_ts));
EXPECT_FALSE(snap_with_first_applied.IsApplied(second_ts));
// Apply the commit-wait one once it is time.
ASSERT_OK(hybrid_clock.WaitUntilAfter(cw_ts, MonoTime::Max()));
cw_op.StartApplying();
cw_op.FinishApplying();
// A new snapshot at this point should still think that normal_op_2 is nonapplied
MvccSnapshot snap_with_all_applied(mgr);
EXPECT_FALSE(snap_with_all_applied.IsApplied(second_ts));
}
// Tests starting ops at a point-in-time in the past and applying them while
// adjusting the new op timestamp lower bound.
TEST_F(MvccTest, TestSafeTimeWithOutOfOrderOps) {
MvccManager mgr;
// Set the clock to some time in the "future".
ASSERT_OK(clock_.Update(Timestamp(100)));
// Start an op in the "past"
Timestamp ts_in_the_past(50);
ScopedOp op_in_the_past(&mgr, ts_in_the_past);
op_in_the_past.StartApplying();
ASSERT_EQ(Timestamp::kInitialTimestamp, mgr.GetCleanTimestamp());
// Applying 'op_in_the_past' should not advance the new op lower
// bound or the clean time.
op_in_the_past.FinishApplying();
// Now take a snapshot.
MvccSnapshot snap_with_first_op(mgr);
// Because we did not advance the the new op lower bound or clean time, even
// though the only in-flight op was applied at time 50, an op at time 40
// should still be considered nonapplied.
ASSERT_FALSE(snap_with_first_op.IsApplied(Timestamp(40)));
// Now advance the both clean and new op lower bound watermarks to the last
// applied op.
mgr.AdjustNewOpLowerBound(Timestamp(50));
ASSERT_EQ(ts_in_the_past, mgr.GetCleanTimestamp());
MvccSnapshot snap_with_adjusted_clean_time(mgr);
ASSERT_TRUE(snap_with_adjusted_clean_time.IsApplied(Timestamp(40)));
}
TEST_F(MvccTest, TestScopedOp) {
MvccManager mgr;
MvccSnapshot snap;
{
ScopedOp t1(&mgr, clock_.Now());
ScopedOp t2(&mgr, clock_.Now());
ASSERT_EQ(1, t1.timestamp().value());
ASSERT_EQ(2, t2.timestamp().value());
t1.StartApplying();
t1.FinishApplying();
snap = MvccSnapshot(mgr);
ASSERT_TRUE(snap.IsApplied(t1.timestamp()));
ASSERT_FALSE(snap.IsApplied(t2.timestamp()));
}
// t2 going out of scope aborts it.
snap = MvccSnapshot(mgr);
ASSERT_TRUE(snap.IsApplied(Timestamp(1)));
ASSERT_FALSE(snap.IsApplied(Timestamp(2)));
// Test that an applying scoped op does not crash if it goes out of
// scope while the MvccManager is closed.
mgr.Close();
{
ScopedOp t(&mgr, clock_.Now());
NO_FATALS(t.StartApplying());
}
}
TEST_F(MvccTest, TestPointInTimeSnapshot) {
MvccSnapshot snap(Timestamp(10));
ASSERT_TRUE(snap.IsApplied(Timestamp(1)));
ASSERT_TRUE(snap.IsApplied(Timestamp(9)));
ASSERT_FALSE(snap.IsApplied(Timestamp(10)));
ASSERT_FALSE(snap.IsApplied(Timestamp(11)));
}
TEST_F(MvccTest, TestMayHaveAppliedOpsAtOrAfter) {
MvccSnapshot snap;
snap.all_applied_before_ = Timestamp(10);
snap.applied_timestamps_.push_back(11);
snap.applied_timestamps_.push_back(13);
snap.none_applied_at_or_after_ = Timestamp(14);
ASSERT_TRUE(snap.MayHaveAppliedOpsAtOrAfter(Timestamp(9)));
ASSERT_TRUE(snap.MayHaveAppliedOpsAtOrAfter(Timestamp(10)));
ASSERT_TRUE(snap.MayHaveAppliedOpsAtOrAfter(Timestamp(12)));
ASSERT_TRUE(snap.MayHaveAppliedOpsAtOrAfter(Timestamp(13)));
ASSERT_FALSE(snap.MayHaveAppliedOpsAtOrAfter(Timestamp(14)));
ASSERT_FALSE(snap.MayHaveAppliedOpsAtOrAfter(Timestamp(15)));
// Test for "all applied" snapshot
MvccSnapshot all_applied =
MvccSnapshot::CreateSnapshotIncludingAllOps();
ASSERT_TRUE(
all_applied.MayHaveAppliedOpsAtOrAfter(Timestamp(1)));
ASSERT_TRUE(
all_applied.MayHaveAppliedOpsAtOrAfter(Timestamp(12345)));
// And "none applied" snapshot
MvccSnapshot none_applied =
MvccSnapshot::CreateSnapshotIncludingNoOps();
ASSERT_FALSE(
none_applied.MayHaveAppliedOpsAtOrAfter(Timestamp(1)));
ASSERT_FALSE(
none_applied.MayHaveAppliedOpsAtOrAfter(Timestamp(12345)));
// Test for a "clean" snapshot
MvccSnapshot clean_snap(Timestamp(10));
ASSERT_TRUE(clean_snap.MayHaveAppliedOpsAtOrAfter(Timestamp(9)));
ASSERT_FALSE(clean_snap.MayHaveAppliedOpsAtOrAfter(Timestamp(10)));
}
TEST_F(MvccTest, TestMayHaveNonAppliedOpsBefore) {
MvccSnapshot snap;
snap.all_applied_before_ = Timestamp(10);
snap.applied_timestamps_.push_back(11);
snap.applied_timestamps_.push_back(13);
snap.none_applied_at_or_after_ = Timestamp(14);
ASSERT_FALSE(snap.MayHaveNonAppliedOpsAtOrBefore(Timestamp(9)));
ASSERT_TRUE(snap.MayHaveNonAppliedOpsAtOrBefore(Timestamp(10)));
ASSERT_TRUE(snap.MayHaveNonAppliedOpsAtOrBefore(Timestamp(11)));
ASSERT_TRUE(snap.MayHaveNonAppliedOpsAtOrBefore(Timestamp(13)));
ASSERT_TRUE(snap.MayHaveNonAppliedOpsAtOrBefore(Timestamp(14)));
ASSERT_TRUE(snap.MayHaveNonAppliedOpsAtOrBefore(Timestamp(15)));
// Test for "all applied" snapshot
MvccSnapshot all_applied =
MvccSnapshot::CreateSnapshotIncludingAllOps();
ASSERT_FALSE(
all_applied.MayHaveNonAppliedOpsAtOrBefore(Timestamp(1)));
ASSERT_FALSE(
all_applied.MayHaveNonAppliedOpsAtOrBefore(Timestamp(12345)));
// And "none applied" snapshot
MvccSnapshot none_applied =
MvccSnapshot::CreateSnapshotIncludingNoOps();
ASSERT_TRUE(
none_applied.MayHaveNonAppliedOpsAtOrBefore(Timestamp(1)));
ASSERT_TRUE(
none_applied.MayHaveNonAppliedOpsAtOrBefore(
Timestamp(12345)));
// Test for a "clean" snapshot
MvccSnapshot clean_snap(Timestamp(10));
ASSERT_FALSE(clean_snap.MayHaveNonAppliedOpsAtOrBefore(Timestamp(9)));
ASSERT_TRUE(clean_snap.MayHaveNonAppliedOpsAtOrBefore(Timestamp(10)));
// Test for the case where we have a single op in flight. Since this is also
// the earliest op, all_applied_before_ is equal to the op's ts, but when
// it gets applied we can't advance all_applied_before_ past it because
// there is no other op to advance it to. In this case we should still report
// that there can't be any nonapplied ops before.
MvccSnapshot snap2;
snap2.all_applied_before_ = Timestamp(10);
snap2.applied_timestamps_.push_back(10);
ASSERT_FALSE(snap2.MayHaveNonAppliedOpsAtOrBefore(Timestamp(10)));
}
TEST_F(MvccTest, TestAreAllOpsAppliedForTests) {
MvccManager mgr;
// start several ops and take snapshots along the way
Timestamp ts1 = clock_.Now();
ScopedOp op1(&mgr, ts1);
Timestamp ts2 = clock_.Now();
ScopedOp op2(&mgr, ts2);
Timestamp ts3 = clock_.Now();
ScopedOp op3(&mgr, ts3);
mgr.AdjustNewOpLowerBound(clock_.Now());
ASSERT_FALSE(mgr.AreAllOpsAppliedForTests(Timestamp(1)));
ASSERT_FALSE(mgr.AreAllOpsAppliedForTests(Timestamp(2)));
ASSERT_FALSE(mgr.AreAllOpsAppliedForTests(Timestamp(3)));
// Apply op3, should all still report as having as having nonapplied ops.
op3.StartApplying();
op3.FinishApplying();
ASSERT_FALSE(mgr.AreAllOpsAppliedForTests(Timestamp(1)));
ASSERT_FALSE(mgr.AreAllOpsAppliedForTests(Timestamp(2)));
ASSERT_FALSE(mgr.AreAllOpsAppliedForTests(Timestamp(3)));
// Apply op1, first snap with in-flights should now report as all applied
// and remaining snaps as still having nonapplied ops
op1.StartApplying();
op1.FinishApplying();
ASSERT_TRUE(mgr.AreAllOpsAppliedForTests(Timestamp(1)));
ASSERT_FALSE(mgr.AreAllOpsAppliedForTests(Timestamp(2)));
ASSERT_FALSE(mgr.AreAllOpsAppliedForTests(Timestamp(3)));
// Now they should all report as all applied.
op2.StartApplying();
op2.FinishApplying();
ASSERT_TRUE(mgr.AreAllOpsAppliedForTests(Timestamp(1)));
ASSERT_TRUE(mgr.AreAllOpsAppliedForTests(Timestamp(2)));
ASSERT_TRUE(mgr.AreAllOpsAppliedForTests(Timestamp(3)));
}
TEST_F(MvccTest, WaitForCleanSnapshotSnapWithNoInflights) {
MvccManager mgr;
Timestamp to_wait_for = clock_.Now();
mgr.AdjustNewOpLowerBound(clock_.Now());
thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, to_wait_for);
// join immediately.
waiting_thread.join();
ASSERT_TRUE(HasResultSnapshot());
}
TEST_F(MvccTest, WaitForCleanSnapshotSnapBeforeSafeTimeWithInFlights) {
MvccManager mgr;
Timestamp ts1 = clock_.Now();
ScopedOp op1(&mgr, ts1);
Timestamp ts2 = clock_.Now();
ScopedOp op2(&mgr, ts2);
mgr.AdjustNewOpLowerBound(ts2);
Timestamp to_wait_for = clock_.Now();
// Select a new op timestamp lower bound that is after all ops and after the
// the timestamp we'll wait for. This will cause "clean time" to move when
// op1 and op2 finish applying.
Timestamp future_ts = clock_.Now();
mgr.AdjustNewOpLowerBound(future_ts);
thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, to_wait_for);
ASSERT_FALSE(HasResultSnapshot());
op1.StartApplying();
op1.FinishApplying();
ASSERT_FALSE(HasResultSnapshot());
op2.StartApplying();
op2.FinishApplying();
waiting_thread.join();
ASSERT_TRUE(HasResultSnapshot());
}
TEST_F(MvccTest, WaitForCleanSnapshotSnapAfterSafeTimeWithInFlights) {
MvccManager mgr;
Timestamp ts1 = clock_.Now();
ScopedOp op1(&mgr, ts1);
Timestamp ts2 = clock_.Now();
ScopedOp op2(&mgr, ts2);
mgr.AdjustNewOpLowerBound(ts2);
// Wait should return immediately, since we have no ops "applying" yet.
ASSERT_OK(mgr.WaitForApplyingOpsToApply());
op1.StartApplying();
Status s;
thread waiting_thread = thread([&] {
s = mgr.WaitForApplyingOpsToApply();
});
while (mgr.GetNumWaitersForTests() == 0) {
SleepFor(MonoDelta::FromMilliseconds(5));
}
ASSERT_EQ(mgr.GetNumWaitersForTests(), 1);
// Aborting the other op shouldn't affect our waiter.
op2.Abort();
ASSERT_EQ(mgr.GetNumWaitersForTests(), 1);
// Applying our op should wake the waiter.
op1.FinishApplying();
ASSERT_EQ(mgr.GetNumWaitersForTests(), 0);
waiting_thread.join();
ASSERT_OK(s);
}
TEST_F(MvccTest, WaitForCleanSnapshotSnapAtTimestampWithInFlights) {
MvccManager mgr;
// Ops with timestamp 1 through 3
Timestamp ts1 = clock_.Now();
ScopedOp op1(&mgr, ts1);
Timestamp ts2 = clock_.Now();
ScopedOp op2(&mgr, ts2);
Timestamp ts3 = clock_.Now();
ScopedOp op3(&mgr, ts3);
// Start a thread waiting for ops with ts <= 2 to finish applying
thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, ts2);
ASSERT_FALSE(HasResultSnapshot());
// Apply op 1 - thread should still wait.
op1.StartApplying();
op1.FinishApplying();
SleepFor(MonoDelta::FromMilliseconds(1));
ASSERT_FALSE(HasResultSnapshot());
// Apply op 3 - thread should still wait.
op3.StartApplying();
op3.FinishApplying();
SleepFor(MonoDelta::FromMilliseconds(1));
ASSERT_FALSE(HasResultSnapshot());
// Apply op 2 - thread should still wait.
op2.StartApplying();
op2.FinishApplying();
ASSERT_FALSE(HasResultSnapshot());
// Advance new op lower bound and the clean time, thread should continue.
mgr.AdjustNewOpLowerBound(ts3);
waiting_thread.join();
ASSERT_TRUE(HasResultSnapshot());
}
TEST_F(MvccTest, TestWaitForApplyingOpsToApply) {
MvccManager mgr;
Timestamp ts1 = clock_.Now();
ScopedOp op1(&mgr, ts1);
Timestamp ts2 = clock_.Now();
ScopedOp op2(&mgr, ts2);
mgr.AdjustNewOpLowerBound(ts2);
// Wait should return immediately, since we have no ops "applying" yet.
ASSERT_OK(mgr.WaitForApplyingOpsToApply());
op1.StartApplying();
thread waiting_thread = thread(&MvccManager::WaitForApplyingOpsToApply, &mgr);
while (mgr.GetNumWaitersForTests() == 0) {
SleepFor(MonoDelta::FromMilliseconds(5));
}
ASSERT_EQ(mgr.GetNumWaitersForTests(), 1);
// Aborting the other op shouldn't affect our waiter.
op2.Abort();
ASSERT_EQ(mgr.GetNumWaitersForTests(), 1);
// Applying our op should wake the waiter.
op1.FinishApplying();
ASSERT_EQ(mgr.GetNumWaitersForTests(), 0);
waiting_thread.join();
}
// Test to ensure that after MVCC has been closed, it will not Wait and will
// instead return an error.
TEST_F(MvccTest, TestDontWaitAfterClose) {
MvccManager mgr;
Timestamp ts1 = clock_.Now();
ScopedOp op1(&mgr, ts1);
mgr.AdjustNewOpLowerBound(ts1);
op1.StartApplying();
// Spin up a thread to wait on the applying op.
// Lock the changing status. This is only necessary in this test to read the
// status from the main thread, showing that, regardless of where, closing
// MVCC will cause waiters to abort mid-wait.
Status s;
simple_spinlock status_lock;
thread waiting_thread = thread([&] {
std::lock_guard<simple_spinlock> l(status_lock);
s = mgr.WaitForApplyingOpsToApply();
});
// Wait until the waiter actually gets registered.
while (mgr.GetNumWaitersForTests() == 0) {
SleepFor(MonoDelta::FromMilliseconds(5));
}
// Set that the mgr is closing. This should cause waiters to abort.
mgr.Close();
waiting_thread.join();
ASSERT_STR_CONTAINS(s.ToString(), "closed");
ASSERT_TRUE(s.IsAborted()) << s.ToString();
// New waiters should abort immediately.
s = mgr.WaitForApplyingOpsToApply();
ASSERT_STR_CONTAINS(s.ToString(), "closed");
ASSERT_TRUE(s.IsAborted()) << s.ToString();
}
// Test that if we abort an op we don't advance the new op lower bound and
// don't add the op to the applied set.
TEST_F(MvccTest, TestOpAbort) {
MvccManager mgr;
// Ops with timestamps 1 through 3
Timestamp ts1 = clock_.Now();
ScopedOp op1(&mgr, ts1);
Timestamp ts2 = clock_.Now();
ScopedOp op2(&mgr, ts2);
Timestamp ts3 = clock_.Now();
ScopedOp op3(&mgr, ts3);
mgr.AdjustNewOpLowerBound(ts3);
// Now abort op1, this shouldn't move the clean time and the op shouldn't be
// reported as applied.
op1.Abort();
ASSERT_EQ(Timestamp::kInitialTimestamp, mgr.GetCleanTimestamp());
ASSERT_FALSE(mgr.cur_snap_.IsApplied(ts1));
// Applying op3 shouldn't advance the clean time since it is not the earliest
// in-flight, but it should advance 'new_op_timestamp_exc_lower_bound_' to 3.
op3.StartApplying();
op3.FinishApplying();
ASSERT_TRUE(mgr.cur_snap_.IsApplied(ts3));
ASSERT_EQ(ts3, mgr.new_op_timestamp_exc_lower_bound_);
// Applying op2 should advance the clean time to 3.
op2.StartApplying();
op2.FinishApplying();
ASSERT_TRUE(mgr.cur_snap_.IsApplied(ts2));
ASSERT_EQ(ts3, mgr.GetCleanTimestamp());
}
// This tests for a bug we were observing, where a clean snapshot would not
// coalesce to the latest timestamp.
TEST_F(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnApply) {
MvccManager mgr;
clock_.Update(Timestamp(20));
ScopedOp op1(&mgr, Timestamp(10));
ScopedOp op2(&mgr, Timestamp(15));
mgr.AdjustNewOpLowerBound(Timestamp(15));
op2.StartApplying();
op2.FinishApplying();
op1.StartApplying();
op1.FinishApplying();
ASSERT_EQ(mgr.cur_snap_.ToString(), "MvccSnapshot[applied={T|T < 15 or (T in {15})}]");
}
// Various death tests which ensure that we can only transition in one of the following
// valid ways:
//
// - Start() -> StartApplying() -> FinishApplying()
// - Start() -> Abort()
//
// Any other transition should fire a CHECK failure.
TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
MvccManager mgr;
MvccSnapshot snap;
EXPECT_DEATH({
mgr.StartApplyingOp(Timestamp(1));
}, "Cannot mark timestamp 1 as APPLYING: not in the in-flight map");
// Depending whether this is a DEBUG or RELEASE build, the error message
// could be different for this case -- the "future timestamp" check is only
// run in DEBUG builds.
EXPECT_DEATH({
mgr.FinishApplyingOp(Timestamp(1));
},
"Trying to apply an op with a future timestamp|"
"Trying to remove timestamp which isn't in the in-flight set: 1");
clock_.Update(Timestamp(20));
EXPECT_DEATH({
mgr.FinishApplyingOp(Timestamp(1));
}, "Trying to remove timestamp which isn't in the in-flight set: 1");
// Start an op, and try applying it without having moved to "Applying"
// state.
Timestamp t = clock_.Now();
mgr.StartOp(t);
EXPECT_DEATH({
mgr.FinishApplyingOp(t);
}, "Trying to apply an op which never entered APPLYING state");
// Aborting should succeed, since we never moved to Applying.
mgr.AbortOp(t);
// Aborting a second time should fail
EXPECT_DEATH({
mgr.AbortOp(t);
}, "Trying to remove timestamp which isn't in the in-flight set: 21");
// Start a new op. This time, mark it as Applying.
t = clock_.Now();
mgr.StartOp(t);
mgr.AdjustNewOpLowerBound(t);
mgr.StartApplyingOp(t);
// Can only call StartApplying once.
EXPECT_DEATH({
mgr.StartApplyingOp(t);
}, "Cannot mark timestamp 22 as APPLYING: wrong state: 1");
// Cannot Abort() an op once we start applying it.
EXPECT_DEATH({
mgr.AbortOp(t);
}, "op with timestamp 22 cannot be aborted in state 1");
// We can apply it successfully.
mgr.FinishApplyingOp(t);
}
TEST_F(MvccTest, TestWaitUntilCleanDeadline) {
MvccManager mgr;
// Ops with timestamp 1
Timestamp ts1 = clock_.Now();
ScopedOp op1(&mgr, ts1);
// Wait until the 'op1' timestamp is clean -- this won't happen because the
// op isn't applied yet.
MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(10);
MvccSnapshot snap;
Status s = mgr.WaitForSnapshotWithAllApplied(ts1, &snap, deadline);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
}
// Test for a bug related to the initialization of the MvccManager without any
// pending ops, i.e. when there are only calls to
// AdjustNewOpLowerBound().
//
// Prior to the fix we would advance clean time but not the
// 'none_applied_at_or_after_' watermark, meaning the latter would become lower
// than clean time. This had the effect on compaction of culling delta files
// even though they shouldn't be culled.
// This test makes sure that watermarks are advanced correctly and that delta
// files are culled correctly.
TEST_F(MvccTest, TestCorrectInitWithNoOps) {
MvccManager mgr;
MvccSnapshot snap(mgr);
EXPECT_EQ(snap.all_applied_before_, Timestamp::kInitialTimestamp);
EXPECT_EQ(snap.none_applied_at_or_after_, Timestamp::kInitialTimestamp);
EXPECT_EQ(snap.applied_timestamps_.size(), 0);
// Read the clock a few times to advance the timestamp
for (int i = 0; i < 10; i++) {
clock_.Now();
}
// Advance the new op lower bound.
Timestamp new_ts_lower_bound = clock_.Now();
mgr.AdjustNewOpLowerBound(new_ts_lower_bound);
// Test that the snapshot reports that a timestamp lower than the new op
// lower bound may have applied ops after that timestamp. Conversely, test
// that the snapshot reports that there are no applied ops at or after the
// new lower bound.
MvccSnapshot snap2;
snap2 = MvccSnapshot(mgr);
Timestamp before_lb(new_ts_lower_bound.value() - 1);
Timestamp after_lb(new_ts_lower_bound.value() + 1);
EXPECT_TRUE(snap2.MayHaveAppliedOpsAtOrAfter(before_lb));
EXPECT_FALSE(snap2.MayHaveAppliedOpsAtOrAfter(after_lb));
EXPECT_EQ(snap2.all_applied_before_, new_ts_lower_bound);
EXPECT_EQ(snap2.none_applied_at_or_after_, new_ts_lower_bound);
EXPECT_EQ(snap2.applied_timestamps_.size(), 0);
}
class TransactionMvccTest : public MvccTest {
public:
// Simulates successfully committing the given transaction by starting an MVCC
// op to track the commit, and setting a higher commit timestamp while that
// op is applying. Returns the commit timestmap.
Timestamp Commit(MvccManager* mgr, TxnMetadata* txn_meta) {
// Start an op to begin committing.
Timestamp ts = clock_.Now();
ScopedOp op(mgr, ts);
txn_meta->set_commit_mvcc_op_timestamp(ts);
// Finalize the commit with some future timestamp.
op.StartApplying();
Timestamp commit_ts = Timestamp(ts.value() + 10);
txn_meta->set_commit_timestamp(commit_ts);
op.FinishApplying();
return commit_ts;
}
// Simulates aborting a transaction by beginning to commit, and then aborting.
void AbortAfterBeginCommit(MvccManager* mgr, TxnMetadata* txn_meta) {
// Start an op to begin committing.
Timestamp ts = clock_.Now();
ScopedOp op(mgr, ts);
txn_meta->set_commit_mvcc_op_timestamp(ts);
// Abort the commit.
txn_meta->set_aborted();
op.Abort();
}
// Simulates aborting without starting any ops.
static void AbortBeforeBeginCommit(TxnMetadata* txn_meta) {
txn_meta->set_aborted();
}
};
// Test that timestamp snapshots before and after committing correctly
// determine whether transactions are committed.
TEST_F(TransactionMvccTest, TestTimestampSnapshot) {
MvccManager mgr;
scoped_refptr<TxnMetadata> committed_txn_meta(new TxnMetadata);
scoped_refptr<TxnMetadata> aborted_before_begin_commit_txn_meta(new TxnMetadata);
scoped_refptr<TxnMetadata> aborted_after_begin_commit_txn_meta(new TxnMetadata);
Timestamp initial_ts = clock_.Now();
MvccSnapshot initial_snap(initial_ts);
ASSERT_FALSE(initial_snap.IsCommitted(*committed_txn_meta.get()));
ASSERT_FALSE(initial_snap.IsCommitted(*aborted_before_begin_commit_txn_meta.get()));
ASSERT_FALSE(initial_snap.IsCommitted(*aborted_after_begin_commit_txn_meta.get()));
AbortAfterBeginCommit(&mgr, aborted_before_begin_commit_txn_meta.get());
AbortAfterBeginCommit(&mgr, aborted_after_begin_commit_txn_meta.get());
Timestamp commit_ts = Commit(&mgr, committed_txn_meta.get());
// Snapshots taken right before the commit timestamp should not return that
// the transaction was committed.
MvccSnapshot before_commit_snap(commit_ts);
ASSERT_FALSE(before_commit_snap.IsCommitted(*committed_txn_meta.get()));
ASSERT_FALSE(before_commit_snap.IsCommitted(*aborted_before_begin_commit_txn_meta.get()));
ASSERT_FALSE(before_commit_snap.IsCommitted(*aborted_after_begin_commit_txn_meta.get()));
// Snapshots taken right after the commit timestamp shouldn't return that
// aborted transactions were committed.
MvccSnapshot after_commit_snap(Timestamp(commit_ts.value() + 1));
ASSERT_TRUE(after_commit_snap.IsCommitted(*committed_txn_meta.get()));
ASSERT_FALSE(after_commit_snap.IsCommitted(*aborted_before_begin_commit_txn_meta.get()));
ASSERT_FALSE(after_commit_snap.IsCommitted(*aborted_after_begin_commit_txn_meta.get()));
}
// Test that the latest snapshots before and after committing or aborting
// correctly determine whether transactions are committed.
TEST_F(TransactionMvccTest, TestLatestSnapshot) {
MvccManager mgr;
{
scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
MvccSnapshot latest_before_commit(mgr);
ASSERT_FALSE(latest_before_commit.IsCommitted(*txn_meta.get()));
Commit(&mgr, txn_meta.get());
MvccSnapshot latest_after_commit(mgr);
ASSERT_FALSE(latest_before_commit.IsCommitted(*txn_meta.get()));
ASSERT_TRUE(latest_after_commit.IsCommitted(*txn_meta.get()));
}
{
scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
MvccSnapshot latest_before_abort(mgr);
ASSERT_FALSE(latest_before_abort.IsCommitted(*txn_meta.get()));
AbortAfterBeginCommit(&mgr, txn_meta.get());
MvccSnapshot latest_after_abort(mgr);
ASSERT_FALSE(latest_before_abort.IsCommitted(*txn_meta.get()));
ASSERT_FALSE(latest_after_abort.IsCommitted(*txn_meta.get()));
}
{
scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
MvccSnapshot latest_before_abort(mgr);
ASSERT_FALSE(latest_before_abort.IsCommitted(*txn_meta.get()));
AbortBeforeBeginCommit(txn_meta.get());
MvccSnapshot latest_after_abort(mgr);
ASSERT_FALSE(latest_before_abort.IsCommitted(*txn_meta.get()));
ASSERT_FALSE(latest_after_abort.IsCommitted(*txn_meta.get()));
}
}
enum OpType {
kCommit,
kAbortAfterBeginCommit,
kAbortBeforeBeginCommit,
};
class ParamedTransactionMvccTest : public TransactionMvccTest,
public ::testing::WithParamInterface<OpType> {};
// Test that snapshots taken concurrently with commit or abort do not change
// commit status.
TEST_P(ParamedTransactionMvccTest, TestConcurrentLatestSnapshots) {
constexpr const int kNumThreads = 10;
Barrier b(kNumThreads + 1);
vector<thread> threads;
vector<MvccSnapshot> snaps(kNumThreads);
// NOTE: we really only care about bools, but vector<bool> isn't thread-safe.
vector<int> is_committed(kNumThreads);
scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
MvccManager mgr;
for (int i = 0; i < kNumThreads; i++) {
threads.emplace_back([&, i] {
b.Wait();
// Take a snapshot and immediately check whether the transaction is
// committed.
snaps[i] = MvccSnapshot(mgr);
is_committed[i] = snaps[i].IsCommitted(*txn_meta.get());
});
}
b.Wait();
switch (GetParam()) {
case kCommit:
Commit(&mgr, txn_meta.get());
break;
case kAbortAfterBeginCommit:
AbortAfterBeginCommit(&mgr, txn_meta.get());
break;
case kAbortBeforeBeginCommit:
AbortBeforeBeginCommit(txn_meta.get());
break;
}
for (auto& t : threads) {
t.join();
}
// Take the collected snapshots and evaluate them again against the
// transaction metadata. The commit status should not have changed.
for (int i = 0; i < kNumThreads; i++) {
ASSERT_EQ(is_committed[i], snaps[i].IsCommitted(*txn_meta.get()));
}
}
INSTANTIATE_TEST_SUITE_P(Op, ParamedTransactionMvccTest,
::testing::Values(kCommit, kAbortAfterBeginCommit,
kAbortBeforeBeginCommit));
} // namespace tablet
} // namespace kudu