| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstdint> |
| #include <functional> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/clock/clock.h" |
| #include "kudu/clock/hybrid_clock.h" |
| #include "kudu/clock/mock_ntp.h" |
| #include "kudu/clock/time_service.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/walltime.h" |
| #include "kudu/tablet/compaction.h" |
| #include "kudu/tablet/diskrowset.h" |
| #include "kudu/tablet/local_tablet_writer.h" |
| #include "kudu/tablet/rowset.h" |
| #include "kudu/tablet/rowset_metadata.h" |
| #include "kudu/tablet/tablet-harness.h" |
| #include "kudu/tablet/tablet-test-base.h" |
| #include "kudu/tablet/tablet-test-util.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_metrics.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| |
| DECLARE_bool(enable_maintenance_manager); |
| DECLARE_int32(tablet_history_max_age_sec); |
| DECLARE_string(time_source); |
| |
| using kudu::clock::HybridClock; |
| using std::nullopt; |
| using std::string; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tablet { |
| |
| class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> { |
| public: |
| typedef TabletTestBase<IntKeyTestSetup<INT64>> Superclass; |
| |
| TabletHistoryGcTest() |
| : Superclass(TabletHarness::Options::HYBRID_CLOCK) { |
| FLAGS_time_source = "mock"; |
| } |
| |
| void SetMockTime(int64_t micros) { |
| auto* hybrid_clock = down_cast<HybridClock*>(clock()); |
| auto* ntp = down_cast<clock::MockNtp*>(hybrid_clock->time_service()); |
| ntp->SetMockClockWallTimeForTests(micros); |
| } |
| |
| virtual void SetUp() OVERRIDE { |
| NO_FATALS(TabletTestBase<IntKeyTestSetup<INT64>>::SetUp()); |
| // Mock clock defaults to 0 and this screws up the AHM calculation which ends up negative. |
| SetMockTime(GetCurrentTimeMicros()); |
| } |
| |
| protected: |
| enum ToFlush { |
| FLUSH, |
| NO_FLUSH |
| }; |
| |
| // Attempt to run the deleted rowset GC, triggering an assertion failure if |
| // it failed or if our metrics don't make sense. |
| void TryRunningDeletedRowsetGC() { |
| const auto& metrics = tablet()->metrics(); |
| int orig_bytes = metrics->deleted_rowset_gc_bytes_deleted->value(); |
| int orig_count = metrics->deleted_rowset_gc_duration->TotalCount(); |
| int64_t bytes = 0; |
| ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes)); |
| ASSERT_LT(0, bytes); |
| ASSERT_OK(tablet()->DeleteAncientDeletedRowsets()); |
| ASSERT_EQ(bytes + orig_bytes, metrics->deleted_rowset_gc_bytes_deleted->value()); |
| ASSERT_GT(metrics->deleted_rowset_gc_duration->TotalCount(), orig_count); |
| } |
| |
| // Helper functions that mutate rows in batches of keys: |
| // [0, rows_per_rowset) |
| // [rows_per_rowset, 2*rows_per_rowset) |
| // ... |
| // [(num_rowsets - 1)*rows_per_rowset, num_rowsets*rows_per_rowset) |
| // |
| // ...flushing MRS or DMS (depending on the workload) in between batches. |
| void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset); |
| void UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, int32_t val); |
| void DeleteOriginalRows(int64_t num_batches, int64_t rows_per_batch, bool flush_dms); |
| |
| void AddTimeToHybridClock(MonoDelta delta) { |
| uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now()); |
| uint64_t new_time = now + delta.ToMicroseconds(); |
| SetMockTime(new_time); |
| } |
| // Specify row regex to match on. Empty string means don't match anything. |
| void VerifyDebugDumpRowsMatch(const string& pattern) const; |
| |
| // Returns the total number of rows there should be after inserting rows. |
| int64_t TotalNumRows() const { return kNumRowsets * rows_per_rowset_; } |
| |
| // Returns a functor that returns whether all rows have 'expected_val' for |
| // their values. |
| static TestRowVerifier GenRowsEqualVerifier(int32_t expected_val) { |
| return [=](int32_t /*key*/, int32_t val) { return val == expected_val; }; |
| } |
| const TestRowVerifier kRowsEqual0 = GenRowsEqualVerifier(0); |
| const TestRowVerifier kRowsEqual1 = GenRowsEqualVerifier(1); |
| const TestRowVerifier kRowsEqual2 = GenRowsEqualVerifier(2); |
| |
| const int kStartRow = 0; |
| const int kNumRowsets = 3; |
| int64_t rows_per_rowset_ = 300; |
| }; |
| |
| void TabletHistoryGcTest::InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) { |
| for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) { |
| InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, /*val*/0); |
| ASSERT_OK(tablet()->Flush()); |
| } |
| ASSERT_EQ(num_rowsets, tablet()->num_rowsets()); |
| } |
| |
| void TabletHistoryGcTest::DeleteOriginalRows(int64_t num_batches, |
| int64_t rows_per_batch, bool flush_dms) { |
| for (int rowset_id = 0; rowset_id < num_batches; rowset_id++) { |
| NO_FATALS(DeleteTestRows(rowset_id * rows_per_batch, rows_per_batch)); |
| if (flush_dms) { |
| ASSERT_OK(tablet()->FlushAllDMSForTests()); |
| } |
| } |
| } |
| |
| void TabletHistoryGcTest::UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, |
| int32_t val) { |
| for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) { |
| UpsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, val); |
| ASSERT_OK(tablet()->FlushAllDMSForTests()); |
| } |
| ASSERT_EQ(num_rowsets, tablet()->num_rowsets()); |
| } |
| |
| void TabletHistoryGcTest::VerifyDebugDumpRowsMatch(const string& pattern) const { |
| vector<string> rows; |
| ASSERT_OK(tablet()->DebugDump(&rows)); |
| // Ignore the non-data (formattting) lines in the output. |
| std::string base_pattern = R"(^Dumping|^-|^MRS|^RowSet)"; |
| if (!pattern.empty()) { |
| base_pattern += "|"; |
| } |
| ASSERT_STRINGS_ALL_MATCH(rows, base_pattern + pattern); |
| } |
| |
| // Test that we do not generate undos for redo operations that are older than |
| // the AHM during major delta compaction. |
| TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) { |
| FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second. |
| |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0)); |
| Timestamp time_after_insert = clock()->Now(); |
| |
| // Timestamps recorded after each round of updates. |
| Timestamp post_update_ts[2]; |
| |
| // Mutate all of the rows, setting val=1. Then again for val=2. |
| LocalTabletWriter writer(tablet().get(), &client_schema_); |
| for (int val = 1; val <= 2; val++) { |
| for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) { |
| ASSERT_OK(UpdateTestRow(&writer, row_idx, val)); |
| } |
| // We must flush the DMS before major compaction can operate on these REDOs. |
| for (int i = 0; i < kNumRowsets; i++) { |
| tablet()->FlushBiggestDMS(); |
| } |
| post_update_ts[val - 1] = clock()->Now(); |
| } |
| |
| // Move the AHM beyond our mutations, which are represented as REDOs. |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2))); |
| |
| // Current-time reads should give us 2, but reads from the past should give |
| // us 0 or 1. |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), |
| time_after_insert, kRowsEqual0)); |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), |
| post_update_ts[0], kRowsEqual1)); |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), |
| post_update_ts[1], kRowsEqual2)); |
| NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual2)); |
| |
| // Run major delta compaction. |
| for (int i = 0; i < kNumRowsets; i++) { |
| ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION)); |
| } |
| |
| // Now, we should have base data = 2 with no other historical values. |
| // Major delta compaction will not remove UNDOs, so we expect a single UNDO DELETE as well. |
| NO_FATALS(VerifyDebugDumpRowsMatch( |
| R"(int32 val=2\); Undo Mutations: \[@[[:digit:]]+\(DELETE\)\]; )" |
| R"(Redo Mutations: \[\];$)")); |
| } |
| |
| // Test that major delta compaction works when run on a subset of columns: |
| // 1. Insert rows and flush to DiskRowSets. |
| // 2. Mutate two columns. |
| // 3. Move time forward. |
| // 4. Run major delta compaction on a single column. |
| // 5. Make sure we don't lose anything unexpected. |
| TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) { |
| FLAGS_tablet_history_max_age_sec = 100; |
| |
| rows_per_rowset_ = 20; |
| |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0)); |
| |
| LocalTabletWriter writer(tablet().get(), &client_schema_); |
| for (int32_t row_key = 0; row_key < TotalNumRows(); row_key++) { |
| KuduPartialRow row(&client_schema_); |
| setup_.BuildRowKey(&row, row_key); |
| ASSERT_OK_FAST(row.SetInt32(1, 1)); |
| ASSERT_OK_FAST(row.SetInt32(2, 2)); |
| ASSERT_OK_FAST(writer.Update(row)); |
| } |
| for (int i = 0; i < kNumRowsets; i++) { |
| tablet()->FlushBiggestDMS(); |
| } |
| |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| |
| vector<std::shared_ptr<RowSet>> rowsets; |
| tablet()->GetRowSetsForTests(&rowsets); |
| for (int i = 0; i < kNumRowsets; i++) { |
| DiskRowSet* drs = down_cast<DiskRowSet*>(rowsets[i].get()); |
| vector<ColumnId> col_ids_to_compact = { schema_.column_id(2) }; |
| ASSERT_OK(drs->MajorCompactDeltaStoresWithColumnIds(col_ids_to_compact, nullptr, |
| tablet()->GetHistoryGcOpts())); |
| } |
| |
| NO_FATALS(VerifyDebugDumpRowsMatch( |
| R"(int32 val=2\); Undo Mutations: \[@[[:digit:]]+\(DELETE\)\]; )" |
| R"(Redo Mutations: \[@[[:digit:]]+\(SET key_idx=1\)\];$)")); |
| |
| vector<string> rows; |
| ASSERT_OK(IterateToStringList(&rows)); |
| ASSERT_EQ(TotalNumRows(), rows.size()); |
| } |
| |
| // Tests the following two MRS flush scenarios: |
| // 1. Verify that no UNDO is generated after inserting a row into the MRS, |
| // waiting for the AHM to pass, then flushing the MRS. |
| // 2. Same as #1 but delete the inserted row from the MRS before waiting for |
| // the AHM to pass. |
| TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMRSFlush) { |
| FLAGS_tablet_history_max_age_sec = 100; |
| |
| Timestamp time_before_insert = clock()->Now(); |
| LocalTabletWriter writer(tablet().get(), &client_schema_); |
| for (int32_t i = kStartRow; i < TotalNumRows(); i++) { |
| ASSERT_OK(InsertTestRow(&writer, i, 0)); |
| } |
| Timestamp time_after_insert = clock()->Now(); |
| for (int32_t i = kStartRow; i < TotalNumRows(); i++) { |
| ASSERT_OK(DeleteTestRow(&writer, i)); |
| } |
| Timestamp time_after_delete = clock()->Now(); |
| |
| // Move the clock. |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, |
| time_before_insert, nullopt)); |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), |
| time_after_insert, kRowsEqual0)); |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 0, |
| time_after_delete, nullopt)); |
| |
| // Now flush the MRS. No trace should remain after this. |
| ASSERT_OK(tablet()->Flush()); |
| NO_FATALS(VerifyDebugDumpRowsMatch("")); |
| |
| for (const auto& rsmd : tablet()->metadata()->rowsets()) { |
| ASSERT_EQ(0, rsmd->undo_delta_blocks().size()); |
| } |
| ASSERT_EQ(0, tablet()->OnDiskDataSize()); |
| |
| // Now check the same thing (flush not generating an UNDO), but without the |
| // delete following the insert. We do it with a single row. |
| |
| ASSERT_OK(InsertTestRow(&writer, kStartRow, 0)); |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| ASSERT_OK(tablet()->Flush()); |
| // There should be no undo blocks, despite flushing an insert. |
| for (const auto& rsmd : tablet()->metadata()->rowsets()) { |
| ASSERT_EQ(0, rsmd->undo_delta_blocks().size()); |
| } |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, 1, Timestamp(0), kRowsEqual0)); |
| NO_FATALS(VerifyDebugDumpRowsMatch( |
| R"(int32 val=0\); Undo Mutations: \[\]; Redo Mutations: \[\];$)")); |
| } |
| |
| // Test that undos get GCed on a merge compaction. |
| // In this test, we GC the UNDO that undoes the insert. |
| TEST_F(TabletHistoryGcTest, TestUndoGCOnMergeCompaction) { |
| FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second. |
| |
| Timestamp time_before_insert = clock()->Now(); |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0)); |
| |
| // The earliest thing we can see is an empty tablet. |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier( |
| kStartRow, 0, time_before_insert, nullopt)); |
| |
| // Move the clock so the insert is prior to the AHM, then compact. |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2))); |
| ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| |
| // Now the only thing we can see is the base data. |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), time_before_insert, |
| kRowsEqual0)); |
| NO_FATALS(VerifyDebugDumpRowsMatch( |
| R"(int32 val=0\); Undo Mutations: \[\]; Redo Mutations: \[\];$)")); |
| } |
| |
| // Test that we GC the history and existence of entire deleted rows on a merge compaction. |
| TEST_F(TabletHistoryGcTest, TestRowRemovalGCOnMergeCompaction) { |
| FLAGS_tablet_history_max_age_sec = 100; // Keep history for 100 seconds. |
| |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0)); |
| |
| Timestamp prev_time = clock()->Now(); |
| |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| |
| // Delete all of the rows in the tablet. |
| LocalTabletWriter writer(tablet().get(), &client_schema_); |
| for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) { |
| ASSERT_OK(DeleteTestRow(&writer, row_idx)); |
| } |
| ASSERT_OK(tablet()->Flush()); |
| NO_FATALS(VerifyDebugDumpRowsMatch( |
| R"(int32 val=0\); Undo Mutations: \[@[[:digit:]]+\(DELETE\)\]; )" |
| R"(Redo Mutations: \[@[[:digit:]]+\(DELETE\)\];$)")); |
| |
| // Compaction at this time will only remove the initial UNDO records. The |
| // DELETE REDOs are too recent. |
| ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| NO_FATALS(VerifyDebugDumpRowsMatch( |
| R"(int32 val=0\); Undo Mutations: \[\]; Redo Mutations: )" |
| R"(\[@[[:digit:]]+\(DELETE\)\];$)")); |
| |
| // Move the AHM so that the delete is now prior to it. |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| |
| // Now that even the deletion is prior to the AHM, all of the on-disk data |
| // will be GCed. |
| ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| NO_FATALS(VerifyDebugDumpRowsMatch("")); |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier( |
| kStartRow, 0, prev_time, nullopt)); |
| ASSERT_EQ(0, tablet()->OnDiskDataSize()); |
| } |
| |
| // Test that we don't over-aggressively GC history prior to the AHM. |
| TEST_F(TabletHistoryGcTest, TestNoUndoGCUntilAncientHistoryMark) { |
| FLAGS_tablet_history_max_age_sec = 1000; // 1000 seconds before we GC history. |
| |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| |
| Timestamp prev_time = clock()->Now(); |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2))); |
| |
| // Mutate all of the rows. |
| LocalTabletWriter writer(tablet().get(), &client_schema_); |
| for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) { |
| SCOPED_TRACE(Substitute("Row index: $0", row_idx)); |
| ASSERT_OK(UpdateTestRow(&writer, row_idx, 1)); |
| } |
| |
| // Current-time reads should give us 1, but reads from the past should give us 0. |
| NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual1)); |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time, |
| kRowsEqual0)); |
| |
| for (int i = 0; i < kNumRowsets; i++) { |
| ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION)); |
| ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION)); |
| } |
| ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| |
| // Still read 0 from the past. |
| NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time, |
| kRowsEqual0)); |
| NO_FATALS(VerifyDebugDumpRowsMatch( |
| R"(int32 val=1\); Undo Mutations: \[@[[:digit:]]+\(SET val=0\), )" |
| R"(@[[:digit:]]+\(DELETE\)\]; Redo Mutations: \[\];$)")); |
| } |
| |
| // Test that "ghost" rows (deleted on one rowset, reinserted on another) don't |
| // get revived after history GC. |
| TEST_F(TabletHistoryGcTest, TestGhostRowsNotRevived) { |
| FLAGS_tablet_history_max_age_sec = 100; |
| |
| LocalTabletWriter writer(tablet().get(), &client_schema_); |
| for (int i = 0; i <= 2; i++) { |
| ASSERT_OK(InsertTestRow(&writer, 0, i)); |
| ASSERT_OK(DeleteTestRow(&writer, 0)); |
| ASSERT_OK(tablet()->Flush()); |
| } |
| |
| // Create one more rowset on disk which has just an INSERT (ie a non-ghost row). |
| ASSERT_OK(InsertTestRow(&writer, 0, 3)); |
| ASSERT_OK(tablet()->Flush()); |
| |
| // Move the clock, then compact. This should result in a rowset with just one |
| // row in it. |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| |
| // We should end up with a single row as base data. |
| NO_FATALS(VerifyTestRows(0, 1)); |
| NO_FATALS(VerifyDebugDumpRowsMatch( |
| R"(int32 val=3\); Undo Mutations: \[\]; Redo Mutations: \[\];)")); |
| } |
| |
| // Test to ensure that nothing bad happens when we partially GC different rows |
| // in a rowset. We delete alternating keys to end up with a mix of GCed and |
| // non-GCed rows in each rowset. |
| TEST_F(TabletHistoryGcTest, TestGcOnAlternatingRows) { |
| FLAGS_tablet_history_max_age_sec = 100; |
| rows_per_rowset_ = 5; |
| |
| LocalTabletWriter writer(tablet().get(), &client_schema_); |
| for (int rowset_id = 0; rowset_id < kNumRowsets; rowset_id++) { |
| for (int i = 0; i < rows_per_rowset_; i++) { |
| int32_t row_key = rowset_id * rows_per_rowset_ + i; |
| ASSERT_OK(InsertTestRow(&writer, row_key, 0)); |
| } |
| ASSERT_OK(tablet()->Flush()); |
| } |
| |
| // Delete all the odd rows. |
| for (int32_t row_key = 1; row_key < TotalNumRows(); row_key += 2) { |
| ASSERT_OK(DeleteTestRow(&writer, row_key)); |
| } |
| |
| // Move the clock and compact. We should end up with even rows. |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| |
| vector<string> rows; |
| ASSERT_OK(IterateToStringList(&rows)); |
| ASSERT_EQ(TotalNumRows() / 2 + 1, rows.size()); |
| |
| // Even row keys are assigned negative values in this test framework and so |
| // end up sorted negatively. |
| std::reverse(rows.begin(), rows.end()); |
| |
| int i = 0; |
| for (int32_t row_key = 0; row_key < TotalNumRows(); row_key += 2) { |
| ASSERT_STR_CONTAINS(rows[i], Substitute("int64 key=$0, int32 key_idx=$1, int32 val=0", |
| -1 * row_key, row_key)); |
| i++; |
| } |
| } |
| |
| // Ensure that ReupdateMissedDeltas() doesn't reupdate the wrong row. |
| // 1. Insert rows and flush. |
| // 2. Delete some rows. |
| // 3. Move time forward. |
| // 4. Begin merge compaction. |
| // 5. Insert some of the deleted rows after phase 1 snapshot is written but before phase 2. |
| // 6. Update some of the rows in-between the deleted rows. |
| // 7. Ensure that the rows all look right according to what we expect. |
| // |
| // This test uses the following pattern. Rows with even keys are deleted, rows |
| // with odd keys are used in the test. The following takes place: |
| // - Rows 1 and 5 are inserted with values equaling their keys and are not mutated. |
| // - Rows 3 and 9 are inserted and then updated with values equaling their keys * 10 + 1. |
| // - Row 7 is deleted and then reinserted, as well as updated using successive values. |
| TEST_F(TabletHistoryGcTest, TestGcWithConcurrentCompaction) { |
| FLAGS_tablet_history_max_age_sec = 100; |
| |
| class MyCommonHooks : public Tablet::FlushCompactCommonHooks { |
| public: |
| explicit MyCommonHooks(TabletHistoryGcTest* test) |
| : test_(test), |
| offset_(0) { |
| } |
| |
| Status PostWriteSnapshot() OVERRIDE { |
| LocalTabletWriter writer(test_->tablet().get(), &test_->client_schema()); |
| int offset = offset_.load(std::memory_order_acquire); |
| // Update our reinserted row. |
| CHECK_OK(test_->UpdateTestRow(&writer, 7, 73 + offset)); |
| |
| // Also insert and update other rows after the flush. |
| CHECK_OK(test_->UpdateTestRow(&writer, 3, 30 + offset)); |
| CHECK_OK(test_->UpdateTestRow(&writer, 9, 90 + offset)); |
| return Status::OK(); |
| } |
| |
| void set_offset(int offset) { |
| offset_.store(offset, std::memory_order_release); |
| } |
| |
| private: |
| TabletHistoryGcTest* const test_; |
| std::atomic<int> offset_; |
| }; |
| |
| std::shared_ptr<MyCommonHooks> hooks = std::make_shared<MyCommonHooks>(this); |
| tablet()->SetFlushCompactCommonHooksForTests(hooks); |
| |
| LocalTabletWriter writer(tablet().get(), &client_schema_); |
| for (int i = 0; i < 10; i++) { |
| ASSERT_OK(InsertTestRow(&writer, i, i)); |
| } |
| // Also generate a reinsert. |
| ASSERT_OK(DeleteTestRow(&writer, 7)); |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| ASSERT_OK(InsertTestRow(&writer, 7, 71)); |
| CHECK_OK(UpdateTestRow(&writer, 7, 72)); |
| |
| // Flush the rowset. |
| ASSERT_OK(tablet()->Flush()); |
| |
| // Delete every even row. |
| for (int i = 0; i < 10; i += 2) { |
| ASSERT_OK(DeleteTestRow(&writer, i)); |
| } |
| |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200))); |
| |
| hooks->set_offset(1); |
| ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| tablet()->SetFlushCompactCommonHooksForTests(nullptr); |
| |
| vector<string> rows; |
| ASSERT_OK(IterateToStringList(&rows)); |
| |
| VLOG(2) << JoinStrings(rows, "\n"); |
| |
| vector<int32_t> expected_rows = { 1, 3, 5, 7, 9 }; |
| for (int i = 0; i < expected_rows.size(); i++) { |
| int32_t key = expected_rows[i]; |
| switch (key) { |
| case 1: |
| case 5: |
| ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1", |
| key, key)); |
| break; |
| case 3: |
| case 9: |
| ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1", |
| key, key * 10 + 1)); |
| break; |
| case 7: |
| ASSERT_STR_CONTAINS(rows[i], Substitute("int32 key_idx=$0, int32 val=$1", |
| key, key * 10 + 4)); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| |
| // A version of the tablet history gc test with the maintenance manager disabled. |
| class TabletHistoryGcNoMaintMgrTest : public TabletHistoryGcTest { |
| public: |
| void SetUp() override { |
| FLAGS_enable_maintenance_manager = false; |
| TabletHistoryGcTest::SetUp(); |
| } |
| }; |
| |
| // Test that basic deletion of undo delta blocks prior to the AHM works. |
| TEST_F(TabletHistoryGcNoMaintMgrTest, TestUndoDeltaBlockGc) { |
| FLAGS_tablet_history_max_age_sec = 1000; |
| |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests()); |
| |
| // Generate a bunch of redo deltas and then compact them into undo deltas. |
| constexpr int kNumMutationsPerRow = 5; |
| for (int i = 0; i < kNumMutationsPerRow; i++) { |
| SCOPED_TRACE(i); |
| ASSERT_EQ((i + 1) * kNumRowsets, tablet()->CountUndoDeltasForTests()); |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(1))); |
| NO_FATALS(UpdateOriginalRows(kNumRowsets, rows_per_rowset_, i)); |
| ASSERT_OK(tablet()->MajorCompactAllDeltaStoresForTests()); |
| ASSERT_EQ((i + 2) * kNumRowsets, tablet()->CountUndoDeltasForTests()); |
| } |
| |
| ASSERT_EQ(0, tablet()->CountRedoDeltasForTests()); |
| const int expected_undo_blocks = (kNumMutationsPerRow + 1) * kNumRowsets; |
| ASSERT_EQ(expected_undo_blocks, tablet()->CountUndoDeltasForTests()); |
| |
| // There will be uninitialized undos so we will estimate that there may be |
| // undos to GC. |
| int64_t bytes; |
| ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes)); |
| ASSERT_GT(bytes, 0); |
| |
| // Now initialize the undos. Our estimates should be back to 0. |
| int64_t bytes_in_ancient_undos = 0; |
| const MonoDelta kNoTimeLimit = MonoDelta(); |
| ASSERT_OK(tablet()->InitAncientUndoDeltas(kNoTimeLimit, &bytes_in_ancient_undos)); |
| ASSERT_EQ(0, bytes_in_ancient_undos); |
| ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes)); |
| ASSERT_EQ(0, bytes); |
| |
| // Move the clock so all deltas should be ancient. |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1))); |
| |
| // Initialize and delete undos. |
| ASSERT_OK(tablet()->InitAncientUndoDeltas(kNoTimeLimit, &bytes_in_ancient_undos)); |
| ASSERT_GT(bytes_in_ancient_undos, 0); |
| |
| // Check that the estimate and the metrics match. |
| ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes)); |
| ASSERT_EQ(bytes_in_ancient_undos, bytes); |
| ASSERT_EQ(bytes, tablet()->metrics()->undo_delta_block_estimated_retained_bytes->value()); |
| |
| int64_t blocks_deleted; |
| int64_t bytes_deleted; |
| ASSERT_OK(tablet()->DeleteAncientUndoDeltas(&blocks_deleted, &bytes_deleted)); |
| ASSERT_EQ(expected_undo_blocks, blocks_deleted); |
| ASSERT_GT(bytes_deleted, 0); |
| ASSERT_EQ(0, tablet()->CountUndoDeltasForTests()); |
| VLOG(1) << "Bytes deleted: " << bytes_deleted; |
| |
| // Basic sanity check for our per-tablet metrics. Duration is difficult to |
| // verify with a Histogram so simply ensure each Histogram was incremented. |
| ASSERT_EQ(bytes_deleted, tablet()->metrics()->undo_delta_block_gc_bytes_deleted->value()); |
| ASSERT_EQ(2, tablet()->metrics()->undo_delta_block_gc_init_duration->TotalCount()); |
| ASSERT_EQ(1, tablet()->metrics()->undo_delta_block_gc_delete_duration->TotalCount()); |
| } |
| |
| TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) { |
| FLAGS_tablet_history_max_age_sec = 1000; |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests()); |
| |
| NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/true)); |
| ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests()); |
| |
| // We shouldn't have any ancient rowsets since we haven't passed the AHM. |
| int64_t bytes = 0; |
| ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes)); |
| ASSERT_EQ(0, bytes); |
| |
| // Try to delete ancient deleted rowsets. This should effectively no-op. |
| ASSERT_OK(tablet()->DeleteAncientDeletedRowsets()); |
| ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests()); |
| ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests()); |
| ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes)); |
| ASSERT_EQ(0, bytes); |
| const auto* metrics = tablet()->metrics(); |
| ASSERT_EQ(0, metrics->deleted_rowset_gc_bytes_deleted->value()); |
| ASSERT_EQ(0, metrics->deleted_rowset_gc_duration->TotalCount()); |
| |
| // Move the clock so all rowsets are ancient. Our GC should succeed and we |
| // should be left with no rowsets. |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1))); |
| NO_FATALS(TryRunningDeletedRowsetGC()); |
| ASSERT_EQ(0, tablet()->CountUndoDeltasForTests()); |
| ASSERT_EQ(0, tablet()->CountRedoDeltasForTests()); |
| } |
| |
| TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithDMS) { |
| FLAGS_tablet_history_max_age_sec = 1000; |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/false)); |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1))); |
| NO_FATALS(TryRunningDeletedRowsetGC()); |
| } |
| |
| TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsAfterMinorCompaction) { |
| FLAGS_tablet_history_max_age_sec = 1000; |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| // Flush twice as frequently when deleting so we end up with multiple delta |
| // files per DRS. |
| NO_FATALS(DeleteOriginalRows(kNumRowsets * 2, rows_per_rowset_ / 2, /*flush_dms*/true)); |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1))); |
| int num_redos_before_compaction = tablet()->CountRedoDeltasForTests(); |
| for (int i = 0; i < kNumRowsets; i++) { |
| ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION)); |
| } |
| ASSERT_GT(num_redos_before_compaction, tablet()->CountRedoDeltasForTests()); |
| NO_FATALS(TryRunningDeletedRowsetGC()); |
| } |
| |
| TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsAfterMajorCompaction) { |
| FLAGS_tablet_history_max_age_sec = 1000; |
| NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_)); |
| // Major delta compaction is a no-op if we only have deletes, so trick the |
| // rowsets into major compacting by throwing in some updates. |
| NO_FATALS(UpdateOriginalRows(kNumRowsets, rows_per_rowset_, 5)); |
| // Flush twice as frequently when deleting so we end up with multiple delta |
| // files per DRS. |
| NO_FATALS(DeleteOriginalRows(kNumRowsets * 2, rows_per_rowset_ / 2, /*flush_dms*/true)); |
| NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1))); |
| int num_redos_before_compaction = tablet()->CountRedoDeltasForTests(); |
| for (int i = 0; i < kNumRowsets; i++) { |
| ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION)); |
| } |
| ASSERT_GT(num_redos_before_compaction, tablet()->CountRedoDeltasForTests()); |
| NO_FATALS(TryRunningDeletedRowsetGC()); |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |