blob: 02a7b3e0a307f1b8b2fbbd0367a49fa4bb6f58ea [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <atomic>
#include <gflags/gflags.h>
#include "kudu/server/hybrid_clock.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet-test-base.h"
using kudu::server::HybridClock;
// Specify row regex to match on. Empty string means don't match anything.
#define ASSERT_DEBUG_DUMP_ROWS_MATCH(pattern) do { \
const std::string& _pat = (pattern); \
vector<string> _rows; \
ASSERT_OK(tablet()->DebugDump(&_rows)); \
/* Ignore the non-data (formattting) lines in the output. */ \
std::string _base_pat = R"(^Dumping|^-|^MRS|^RowSet)"; \
if (!_pat.empty()) _base_pat += "|"; \
ASSERT_STRINGS_ALL_MATCH(_rows, _base_pat + _pat); \
} while (0)
namespace kudu {
namespace tablet {
class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
typedef TabletTestBase<IntKeyTestSetup<INT64>> Superclass;
: Superclass(TabletHarness::Options::HYBRID_CLOCK) {
FLAGS_use_mock_wall_clock = true;
virtual void SetUp() OVERRIDE {
// Mock clock defaults to 0 and this screws up the AHM calculation which ends up negative.
enum ToFlush {
void InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset);
void AddTimeToHybridClock(MonoDelta delta) {
uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now());
uint64_t new_time = now + delta.ToMicroseconds();
int64_t TotalNumRows() const { return num_rowsets_ * rows_per_rowset_; }
TestRowVerifier GenRowsEqualVerifier(int32_t expected_val) {
return [=](int32_t key, int32_t val) -> bool { return val == expected_val; };
const TestRowVerifier kRowsEqual0 = GenRowsEqualVerifier(0);
const TestRowVerifier kRowsEqual1 = GenRowsEqualVerifier(1);
const TestRowVerifier kRowsEqual2 = GenRowsEqualVerifier(2);
const int kStartRow = 0;
int num_rowsets_ = 3;
int rows_per_rowset_ = 300;
void TabletHistoryGcTest::InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset) {
ClampRowCount(num_rowsets * rows_per_rowset);
for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, 0);
ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
// Test that we do not generate undos for redo operations that are older than
// the AHM during major delta compaction.
TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
Timestamp time_after_insert = clock()->Now();
// Timestamps recorded after each round of updates.
Timestamp post_update_ts[2];
// Mutate all of the rows, setting val=1. Then again for val=2.
LocalTabletWriter writer(tablet().get(), &client_schema_);
for (int val = 1; val <= 2; val++) {
for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) {
ASSERT_OK(UpdateTestRow(&writer, row_idx, val));
// We must flush the DMS before major compaction can operate on these REDOs.
for (int i = 0; i < num_rowsets_; i++) {
post_update_ts[val - 1] = clock()->Now();
// Move the AHM beyond our mutations, which are represented as REDOs.
// Current-time reads should give us 2, but reads from the past should give
// us 0 or 1.
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
time_after_insert, kRowsEqual0));
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
post_update_ts[0], kRowsEqual1));
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
post_update_ts[1], kRowsEqual2));
NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual2));
// Run major delta compaction.
for (int i = 0; i < num_rowsets_; i++) {
// Now, we should have base data = 2 with no other historical values.
// Major delta compaction will not remove UNDOs, so we expect a single UNDO DELETE as well.
ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=2\) Undos: \[@[[:digit:]]+\(DELETE\)\] Redos: \[\]$)");
// Test that major delta compaction works when run on a subset of columns:
// 1. Insert rows and flush to DiskRowSets.
// 2. Mutate two columns.
// 3. Move time forward.
// 4. Run major delta compaction on a single column.
// 5. Make sure we don't lose anything unexpected.
TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) {
FLAGS_tablet_history_max_age_sec = 100;
num_rowsets_ = 3;
rows_per_rowset_ = 20;
NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
LocalTabletWriter writer(tablet().get(), &client_schema_);
for (int32_t row_key = 0; row_key < TotalNumRows(); row_key++) {
KuduPartialRow row(&client_schema_);
setup_.BuildRowKey(&row, row_key);
ASSERT_OK_FAST(row.SetInt32(1, 1));
ASSERT_OK_FAST(row.SetInt32(2, 2));
for (int i = 0; i < num_rowsets_; i++) {
vector<std::shared_ptr<RowSet>> rowsets;
for (int i = 0; i < num_rowsets_; i++) {
DiskRowSet* drs = down_cast<DiskRowSet*>(rowsets[i].get());
vector<ColumnId> col_ids_to_compact = { schema_.column_id(2) };
ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=2\) Undos: \[@[[:digit:]]+\(DELETE\)\] )"
R"(Redos: \[@[[:digit:]]+\(SET key_idx=1\)\]$)");
vector<string> rows;
ASSERT_EQ(TotalNumRows(), rows.size());
// Tests the following two MRS flush scenarios:
// 1. Verify that no UNDO is generated after inserting a row into the MRS,
// waiting for the AHM to pass, then flushing the MRS.
// 2. Same as #1 but delete the inserted row from the MRS before waiting for
// the AHM to pass.
TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMRSFlush) {
FLAGS_tablet_history_max_age_sec = 100;
Timestamp time_before_insert = clock()->Now();
LocalTabletWriter writer(tablet().get(), &client_schema_);
for (int32_t i = kStartRow; i < TotalNumRows(); i++) {
ASSERT_OK(InsertTestRow(&writer, i, 0));
Timestamp time_after_insert = clock()->Now();
for (int32_t i = kStartRow; i < TotalNumRows(); i++) {
ASSERT_OK(DeleteTestRow(&writer, i));
Timestamp time_after_delete = clock()->Now();
// Move the clock.
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0,
time_before_insert, boost::none));
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
time_after_insert, kRowsEqual0));
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0,
time_after_delete, boost::none));
// Now flush the MRS. No trace should remain after this.
for (const auto& rsmd : tablet()->metadata()->rowsets()) {
ASSERT_EQ(0, rsmd->undo_delta_blocks().size());
ASSERT_EQ(0, tablet()->EstimateOnDiskSize());
// Now check the same thing (flush not generating an UNDO), but without the
// delete following the insert. We do it with a single row.
ASSERT_OK(InsertTestRow(&writer, kStartRow, 0));
// There should be no undo blocks, despite flushing an insert.
for (const auto& rsmd : tablet()->metadata()->rowsets()) {
ASSERT_EQ(0, rsmd->undo_delta_blocks().size());
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 1, Timestamp(0), kRowsEqual0));
ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[\]$)");
// Test that undos get GCed on a merge compaction.
// In this test, we GC the UNDO that undoes the insert.
TEST_F(TabletHistoryGcTest, TestUndoGCOnMergeCompaction) {
FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
Timestamp time_before_insert = clock()->Now();
NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
// The earliest thing we can see is an empty tablet.
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, time_before_insert, boost::none));
// Move the clock so the insert is prior to the AHM, then compact.
// Now the only thing we can see is the base data.
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), time_before_insert,
ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[\]$)");
// Test that we GC the history and existence of entire deleted rows on a merge compaction.
TEST_F(TabletHistoryGcTest, TestRowRemovalGCOnMergeCompaction) {
FLAGS_tablet_history_max_age_sec = 100; // Keep history for 100 seconds.
NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
Timestamp prev_time = clock()->Now();
// Delete all of the rows in the tablet.
LocalTabletWriter writer(tablet().get(), &client_schema_);
for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) {
ASSERT_OK(DeleteTestRow(&writer, row_idx));
R"(int32 val=0\) Undos: \[@[[:digit:]]+\(DELETE\)\] Redos: \[@[[:digit:]]+\(DELETE\)\]$)");
// Compaction at this time will only remove the initial UNDO records. The
// DELETE REDOs are too recent.
ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=0\) Undos: \[\] Redos: \[@[[:digit:]]+\(DELETE\)\]$)");
// Move the AHM so that the delete is now prior to it.
// Now that even the deletion is prior to the AHM, all of the on-disk data
// will be GCed.
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, prev_time, boost::none));
ASSERT_EQ(0, tablet()->EstimateOnDiskSize());
// Test that we don't over-aggressively GC history prior to the AHM.
TEST_F(TabletHistoryGcTest, TestNoUndoGCUntilAncientHistoryMark) {
FLAGS_tablet_history_max_age_sec = 1000; // 1000 seconds before we GC history.
NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
Timestamp prev_time = clock()->Now();
// Mutate all of the rows.
LocalTabletWriter writer(tablet().get(), &client_schema_);
for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) {
SCOPED_TRACE(Substitute("Row index: $0", row_idx));
ASSERT_OK(UpdateTestRow(&writer, row_idx, 1));
// Current-time reads should give us 1, but reads from the past should give us 0.
NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual1));
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time,
for (int i = 0; i < num_rowsets_; i++) {
// Still read 0 from the past.
NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time,
ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=1\) Undos: \[@[[:digit:]]+\(SET val=0\), )"
R"(@[[:digit:]]+\(DELETE\)\] Redos: \[\]$)");
// Test that "ghost" rows (deleted on one rowset, reinserted on another) don't
// get revived after history GC.
TEST_F(TabletHistoryGcTest, TestGhostRowsNotRevived) {
FLAGS_tablet_history_max_age_sec = 100;
LocalTabletWriter writer(tablet().get(), &client_schema_);
for (int i = 0; i <= 2; i++) {
ASSERT_OK(InsertTestRow(&writer, 0, i));
ASSERT_OK(DeleteTestRow(&writer, 0));
// Create one more rowset on disk which has just an INSERT (ie a non-ghost row).
ASSERT_OK(InsertTestRow(&writer, 0, 3));
// Move the clock, then compact. This should result in a rowset with just one
// row in it.
// We should end up with a single row as base data.
NO_FATALS(VerifyTestRows(0, 1));
ASSERT_DEBUG_DUMP_ROWS_MATCH(R"(int32 val=3\) Undos: \[\] Redos: \[\])");
// Test to ensure that nothing bad happens when we partially GC different rows
// in a rowset. We delete alternating keys to end up with a mix of GCed and
// non-GCed rows in each rowset.
TEST_F(TabletHistoryGcTest, TestGcOnAlternatingRows) {
FLAGS_tablet_history_max_age_sec = 100;
num_rowsets_ = 3;
rows_per_rowset_ = 5;
LocalTabletWriter writer(tablet().get(), &client_schema_);
for (int rowset_id = 0; rowset_id < num_rowsets_; rowset_id++) {
for (int i = 0; i < rows_per_rowset_; i++) {
int32_t row_key = rowset_id * rows_per_rowset_ + i;
ASSERT_OK(InsertTestRow(&writer, row_key, 0));
// Delete all the odd rows.
for (int32_t row_key = 1; row_key < TotalNumRows(); row_key += 2) {
ASSERT_OK(DeleteTestRow(&writer, row_key));
// Move the clock and compact. We should end up with even rows.
vector<string> rows;
ASSERT_EQ(TotalNumRows() / 2 + 1, rows.size());
// Even row keys are assigned negative values in this test framework and so
// end up sorted negatively.
std::reverse(rows.begin(), rows.end());
int i = 0;
for (int32_t row_key = 0; row_key < TotalNumRows(); row_key += 2) {
ASSERT_STR_CONTAINS(rows[i], Substitute("int64 key=$0, int32 key_idx=$1, int32 val=0",
-1 * row_key, row_key));
// Ensure that ReupdateMissedDeltas() doesn't reupdate the wrong row.
// 1. Insert rows and flush.
// 2. Delete some rows.
// 3. Move time forward.
// 4. Begin merge compaction.
// 5. Insert some of the deleted rows after phase 1 snapshot is written but before phase 2.
// 6. Update some of the rows in-between the deleted rows.
// 7. Ensure that the rows all look right according to what we expect.
// This test uses the following pattern. Rows with even keys are deleted, rows
// with odd keys are used in the test. The following takes place:
// - Rows 1 and 5 are inserted with values equaling their keys and are not mutated.
// - Rows 3 and 9 are inserted and then updated with values equaling their keys * 10 + 1.
// - Row 7 is deleted and then reinserted, as well as updated using successive values.
TEST_F(TabletHistoryGcTest, TestGcWithConcurrentCompaction) {
FLAGS_tablet_history_max_age_sec = 100;
class MyCommonHooks : public Tablet::FlushCompactCommonHooks {
explicit MyCommonHooks(TabletHistoryGcTest* test)
: test_(test),
offset_(0) {
Status PostWriteSnapshot() OVERRIDE {
LocalTabletWriter writer(test_->tablet().get(), &test_->client_schema());
int offset = offset_.load(std::memory_order_acquire);
// Update our reinserted row.
CHECK_OK(test_->UpdateTestRow(&writer, 7, 73 + offset));
// Also insert and update other rows after the flush.
CHECK_OK(test_->UpdateTestRow(&writer, 3, 30 + offset));
CHECK_OK(test_->UpdateTestRow(&writer, 9, 90 + offset));
return Status::OK();
void set_offset(int offset) {, std::memory_order_release);
TabletHistoryGcTest* const test_;
std::atomic<int> offset_;
std::shared_ptr<MyCommonHooks> hooks = std::make_shared<MyCommonHooks>(this);
LocalTabletWriter writer(tablet().get(), &client_schema_);
for (int i = 0; i < 10; i++) {
ASSERT_OK(InsertTestRow(&writer, i, i));
// Also generate a reinsert.
ASSERT_OK(DeleteTestRow(&writer, 7));
ASSERT_OK(InsertTestRow(&writer, 7, 71));
CHECK_OK(UpdateTestRow(&writer, 7, 72));
// Flush the rowset.
// Delete every even row.
for (int i = 0; i < 10; i += 2) {
ASSERT_OK(DeleteTestRow(&writer, i));
vector<string> rows;
if (VLOG_IS_ON(2)) {
for (const string& r : rows) {
VLOG(2) << r;
vector<int32_t> expected_rows = { 1, 3, 5, 7, 9 };
for (int i = 0; i < expected_rows.size(); i++) {
int32_t key = expected_rows[i];
switch (key) {
case 1:
case 5:
ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1",
key, key));
case 3:
case 9:
ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1",
key, key * 10 + 1));
case 7:
ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1",
key, key * 10 + 4));
} // namespace tablet
} // namespace kudu