blob: a813f03953649794cfd0f1c3e6b1516637c598e9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/diskrowset.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <tuple>
#include <type_traits>
#include <unordered_set>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/io_context.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/delta_tracker.h"
#include "kudu/tablet/diskrowset-test-base.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_mem_trackers.h"
#include "kudu/util/bloom_filter.h"
#include "kudu/util/faststring.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
namespace kudu {
namespace tablet {
class RowSetMetadata;
} // namespace tablet
} // namespace kudu
DEFINE_double(update_fraction, 0.1f, "fraction of rows to update");
DECLARE_bool(cfile_lazy_open);
DECLARE_bool(crash_on_eio);
DECLARE_int32(cfile_default_block_size);
DECLARE_double(env_inject_eio);
DECLARE_double(tablet_delta_store_major_compact_min_ratio);
DECLARE_int32(tablet_delta_store_minor_compact_max);
using std::is_sorted;
using std::make_tuple;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::tuple;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace tablet {
// TODO: add test which calls CopyNextRows on an iterator with no more
// rows - i think it segfaults!
// Test round-trip writing and reading back a rowset with
// multiple columns. Does not test any modifications.
TEST_F(TestRowSet, TestRowSetRoundTrip) {
WriteTestRowSet();
// Now open the DiskRowSet for read
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// First iterate over all columns
LOG_TIMING(INFO, "Iterating over all columns") {
IterateProjection(*rs, schema_, n_rows_);
}
// Now iterate only over the key column
Schema proj_key;
ASSERT_OK(schema_.CreateProjectionByNames({ "key" }, &proj_key));
LOG_TIMING(INFO, "Iterating over only key column") {
IterateProjection(*rs, proj_key, n_rows_);
}
// Now iterate only over the non-key column
Schema proj_val;
ASSERT_OK(schema_.CreateProjectionByNames({ "val" }, &proj_val));
LOG_TIMING(INFO, "Iterating over only val column") {
IterateProjection(*rs, proj_val, n_rows_);
}
// Test that CheckRowPresent returns correct results
Arena arena(64);
ProbeStats stats;
// 1. Check a key which comes before all keys in rowset
{
Schema pk = schema_.CreateKeyProjection();
RowBuilder rb(&pk);
rb.AddString(Slice("h"));
RowSetKeyProbe probe(rb.row(), &arena);
bool present;
ASSERT_OK(rs->CheckRowPresent(probe, nullptr, &present, &stats));
ASSERT_FALSE(present);
}
// 2. Check a key which comes after all keys in rowset
{
Schema pk = schema_.CreateKeyProjection();
RowBuilder rb(&pk);
rb.AddString(Slice("z"));
RowSetKeyProbe probe(rb.row(), &arena);
bool present;
ASSERT_OK(rs->CheckRowPresent(probe, nullptr, &present, &stats));
ASSERT_FALSE(present);
}
// 3. Check a key which is not present, but comes between present
// keys
{
Schema pk = schema_.CreateKeyProjection();
RowBuilder rb(&pk);
rb.AddString(Slice("hello 00000000000049x"));
RowSetKeyProbe probe(rb.row(), &arena);
bool present;
ASSERT_OK(rs->CheckRowPresent(probe, nullptr, &present, &stats));
ASSERT_FALSE(present);
}
// 4. Check a key which is present
{
char buf[256];
FormatKey(49, buf, sizeof(buf));
Schema pk = schema_.CreateKeyProjection();
RowBuilder rb(&pk);
rb.AddString(Slice(buf));
RowSetKeyProbe probe(rb.row(), &arena);
bool present;
ASSERT_OK(rs->CheckRowPresent(probe, nullptr, &present, &stats));
ASSERT_TRUE(present);
}
}
// Test writing a rowset, and then updating some rows in it.
TEST_F(TestRowSet, TestRowSetUpdate) {
Arena arena(64);
WriteTestRowSet();
// Now open the DiskRowSet for read
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Add an update to the delta tracker for a number of keys
// which exist. These updates will change the value to
// equal idx*5 (whereas in the original data, value = idx)
unordered_set<uint32_t> updated;
UpdateExistingRows(rs.get(), FLAGS_update_fraction, &updated);
ASSERT_EQ(static_cast<int>(n_rows_ * FLAGS_update_fraction),
rs->delta_tracker_->dms_->Count());
// Try to add a mutation for a key not in the file (but which falls
// between two valid keys)
faststring buf;
RowChangeListEncoder enc(&buf);
enc.SetToDelete();
Timestamp timestamp(0);
Schema proj_key = schema_.CreateKeyProjection();
RowBuilder rb(&proj_key);
rb.AddString(Slice("hello 00000000000049x"));
RowSetKeyProbe probe(rb.row(), &arena);
OperationResultPB result;
ProbeStats stats;
Status s = rs->MutateRow(timestamp, probe, enc.as_changelist(), op_id_, nullptr, &stats, &result);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_EQ(0, result.mutated_stores_size());
// Now read back the value column, and verify that the updates
// are visible.
VerifyUpdates(*rs, updated);
}
TEST_F(TestRowSet, TestErrorDuringUpdate) {
Arena arena(64);
WriteTestRowSet();
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
faststring buf;
RowChangeListEncoder enc(&buf);
enc.SetToDelete();
// Get a row that we expect to be in the rowset.
Timestamp timestamp(0);
Schema proj_key = schema_.CreateKeyProjection();
RowBuilder rb(&proj_key);
rb.AddString(Slice("hello 000000000000050"));
RowSetKeyProbe probe(rb.row(), &arena);
// But fail while reading it!
FLAGS_crash_on_eio = false;
FLAGS_env_inject_eio = 1.0;
// The mutation should result in an IOError.
OperationResultPB result;
ProbeStats stats;
Status s = rs->MutateRow(timestamp, probe, enc.as_changelist(), op_id_, nullptr, &stats, &result);
LOG(INFO) << s.ToString();
ASSERT_TRUE(s.IsIOError()) << s.ToString();
}
TEST_F(TestRowSet, TestRandomRead) {
// Write 100 rows.
WriteTestRowSet(100);
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Read un-updated row.
NO_FATALS(VerifyRandomRead(*rs, "hello 000000000000050",
R"((string key="hello 000000000000050", uint32 val=50))"));
// Update the row.
OperationResultPB result;
ASSERT_OK(UpdateRow(rs.get(), 50, 12345, &result));
// Read it again -- should see the updated value.
NO_FATALS(VerifyRandomRead(*rs, "hello 000000000000050",
R"((string key="hello 000000000000050", uint32 val=12345))"));
// Try to read a row which comes before the first key.
// This should return no rows.
NO_FATALS(VerifyRandomRead(*rs, "aaaaa", ""));
// Same with a row which falls between keys.
NO_FATALS(VerifyRandomRead(*rs, "hello 000000000000050_between_keys", ""));
// And a row which falls after the last key.
NO_FATALS(VerifyRandomRead(*rs, "hello 000000000000101", ""));
}
// Test Delete() support within a DiskRowSet.
TEST_F(TestRowSet, TestDelete) {
// Write and open a DiskRowSet with 2 rows.
WriteTestRowSet(2);
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
MvccSnapshot snap_before_delete(mvcc_);
// Delete one of the two rows
OperationResultPB result;
ASSERT_OK(DeleteRow(rs.get(), 0, &result));
ASSERT_EQ(1, result.mutated_stores_size());
ASSERT_EQ(0L, result.mutated_stores(0).rs_id());
ASSERT_EQ(0L, result.mutated_stores(0).dms_id());
MvccSnapshot snap_after_delete(mvcc_);
vector<string> rows;
Status s;
RowIteratorOptions opts;
opts.projection = &schema_;
for (int i = 0; i < 2; i++) {
// Reading the MVCC snapshot prior to deletion should show the row.
opts.snap_to_include = snap_before_delete;
ASSERT_OK(DumpRowSet(*rs, opts, &rows));
ASSERT_EQ(2, rows.size());
EXPECT_EQ(R"((string key="hello 000000000000000", uint32 val=0))", rows[0]);
EXPECT_EQ(R"((string key="hello 000000000000001", uint32 val=1))", rows[1]);
// Reading the MVCC snapshot after the deletion should hide the row.
opts.snap_to_include = snap_after_delete;
ASSERT_OK(DumpRowSet(*rs, opts, &rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(R"((string key="hello 000000000000001", uint32 val=1))", rows[0]);
// Trying to delete or update the same row again should fail.
OperationResultPB result;
s = DeleteRow(rs.get(), 0, &result);
ASSERT_TRUE(s.IsNotFound()) << "bad status: " << s.ToString();
ASSERT_EQ(0, result.mutated_stores_size());
result.Clear();
s = UpdateRow(rs.get(), 0, 12345, &result);
ASSERT_TRUE(s.IsNotFound()) << "bad status: " << s.ToString();
ASSERT_EQ(0, result.mutated_stores_size());
// CheckRowPresent should return false.
bool present;
ASSERT_OK(CheckRowPresent(*rs, 0, &present));
EXPECT_FALSE(present);
if (i == 1) {
// Flush DMS. The second pass through the loop will re-verify that the
// externally visible state of the layer has not changed.
// deletions now in a DeltaFile.
ASSERT_OK(rs->FlushDeltas(nullptr));
}
}
}
TEST_F(TestRowSet, TestDMSFlush) {
WriteTestRowSet();
unordered_set<uint32_t> updated;
// Now open the DiskRowSet for read
{
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Add an update to the delta tracker for a number of keys
// which exist. These updates will change the value to
// equal idx*5 (whereas in the original data, value = idx)
UpdateExistingRows(rs.get(), FLAGS_update_fraction, &updated);
ASSERT_EQ(static_cast<int>(n_rows_ * FLAGS_update_fraction),
rs->delta_tracker_->dms_->Count());
ASSERT_OK(rs->FlushDeltas(nullptr));
// Check that the DiskRowSet's DMS has not been initialized.
ASSERT_FALSE(rs->delta_tracker_->dms_);
// Now read back the value column, and verify that the updates
// are visible.
SCOPED_TRACE("before reopen");
VerifyUpdates(*rs, updated);
}
LOG(INFO) << "Reopening rowset ===============";
// Close and re-open the rowset and ensure that the updates were
// persistent.
{
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Now read back the value column, and verify that the updates
// are visible.
SCOPED_TRACE("after reopen");
VerifyUpdates(*rs, updated);
}
}
// Test that when a single row is updated multiple times, we can query the
// historical values using MVCC, even after it is flushed.
TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) {
Arena arena(64);
const Slice key_slice("row");
// Write a single row into a new DiskRowSet.
LOG_TIMING(INFO, "Writing rowset") {
DiskRowSetWriter drsw(rowset_meta_.get(), &schema_,
BloomFilterSizing::BySizeAndFPRate(32*1024, 0.01f));
ASSERT_OK(drsw.Open());
RowBuilder rb(&schema_);
rb.AddString(key_slice);
rb.AddUint32(1);
ASSERT_OK_FAST(WriteRow(rb.data(), &drsw));
ASSERT_OK(drsw.Finish());
}
// Reopen the rowset.
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Take a snapshot of the pre-update state.
vector<MvccSnapshot> snaps;
snaps.emplace_back(mvcc_);
// Update the single row multiple times, taking an MVCC snapshot
// after each update.
faststring update_buf;
RowChangeListEncoder update(&update_buf);
for (uint32_t i = 2; i <= 5; i++) {
{
arena.Reset();
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
update.Reset();
update.AddColumnUpdate(schema_.column(1), schema_.column_id(1), &i);
Schema proj_key = schema_.CreateKeyProjection();
RowBuilder rb(&proj_key);
rb.AddString(key_slice);
RowSetKeyProbe probe(rb.row(), &arena);
OperationResultPB result;
ProbeStats stats;
ASSERT_OK_FAST(rs->MutateRow(op.timestamp(),
probe,
RowChangeList(update_buf),
op_id_,
nullptr,
&stats,
&result));
ASSERT_EQ(1, result.mutated_stores_size());
ASSERT_EQ(0L, result.mutated_stores(0).rs_id());
ASSERT_EQ(0L, result.mutated_stores(0).dms_id());
op.FinishApplying();
}
snaps.emplace_back(mvcc_);
}
// Ensure that MVCC is respected by reading the value at each of the stored
// snapshots.
ASSERT_EQ(5, snaps.size());
for (int i = 0; i < 5; i++) {
SCOPED_TRACE(i);
RowIteratorOptions opts;
opts.projection = &schema_;
opts.snap_to_include = snaps[i];
unique_ptr<RowwiseIterator> iter;
ASSERT_OK(rs->NewRowIterator(opts, &iter));
string data = InitAndDumpIterator(iter.get());
EXPECT_EQ(StringPrintf(R"((string key="row", uint32 val=%d))", i + 1), data);
}
// Flush deltas to disk and ensure that the historical versions are still
// accessible.
ASSERT_OK(rs->FlushDeltas(nullptr));
for (int i = 0; i < 5; i++) {
SCOPED_TRACE(i);
RowIteratorOptions opts;
opts.projection = &schema_;
opts.snap_to_include = snaps[i];
unique_ptr<RowwiseIterator> iter;
ASSERT_OK(rs->NewRowIterator(opts, &iter));
string data = InitAndDumpIterator(iter.get());
EXPECT_EQ(StringPrintf(R"((string key="row", uint32 val=%d))", i + 1), data);
}
}
// Similar to TestDMSFlush above, except does not actually verify
// the results (since the verification step is expensive). Additionally,
// loops the "read" side of the benchmark a number of times, so that
// the speed of applying deltas during read can be micro-benchmarked.
//
// This is most usefully run with an invocation like:
// ./rowset-test --gtest_filter=\*Performance --roundtrip_num_rows=1000000
// --n_read_passes=1000 --update_fraction=0.01
TEST_F(TestRowSet, TestDeltaApplicationPerformance) {
WriteTestRowSet();
// Now open the DiskRowSet for read
{
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
BenchmarkIterationPerformance(*rs.get(),
StringPrintf("Reading %zd rows prior to updates %d times",
n_rows_, FLAGS_n_read_passes));
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
BenchmarkIterationPerformance(*rs.get(),
StringPrintf("Reading %zd rows with %.2f%% updates %d times (updates in DMS)",
n_rows_, FLAGS_update_fraction * 100.0f,
FLAGS_n_read_passes));
ASSERT_OK(rs->FlushDeltas(nullptr));
BenchmarkIterationPerformance(*rs.get(),
StringPrintf("Reading %zd rows with %.2f%% updates %d times (updates on disk)",
n_rows_, FLAGS_update_fraction * 100.0f,
FLAGS_n_read_passes));
}
}
TEST_F(TestRowSet, TestRollingDiskRowSetWriter) {
// Set small block size so that we can roll frequently. Otherwise
// we couldn't output such small files.
google::FlagSaver saver;
FLAGS_cfile_default_block_size = 4096;
RollingDiskRowSetWriter writer(tablet()->metadata(), schema_,
BloomFilterSizing::BySizeAndFPRate(32*1024, 0.01f),
64 * 1024); // roll every 64KB
DoWriteTestRowSet(FLAGS_roundtrip_num_rows, &writer);
// Should have rolled 4 times.
vector<shared_ptr<RowSetMetadata> > metas;
writer.GetWrittenRowSetMetadata(&metas);
EXPECT_EQ(4, metas.size());
int64_t count = 0;
for (const shared_ptr<RowSetMetadata>& meta : metas) {
ASSERT_TRUE(meta->HasDataForColumnIdForTests(schema_.column_id(0)));
count += meta->live_row_count();
}
ASSERT_EQ(FLAGS_roundtrip_num_rows, count);
}
TEST_F(TestRowSet, TestMakeDeltaIteratorMergerUnlocked) {
WriteTestRowSet();
// Now open the DiskRowSet for read
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas(nullptr));
DeltaTracker *dt = rs->delta_tracker();
int num_stores = dt->redo_delta_stores_.size();
vector<shared_ptr<DeltaStore> > compacted_stores;
vector<BlockId> compacted_blocks;
unique_ptr<DeltaIterator> merge_iter;
ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(nullptr, 0, num_stores - 1, &schema_,
&compacted_stores,
&compacted_blocks, &merge_iter));
vector<string> results;
ASSERT_OK(DebugDumpDeltaIterator(REDO, merge_iter.get(), schema_,
ITERATE_OVER_ALL_ROWS,
&results));
for (const string &str : results) {
VLOG(1) << str;
}
ASSERT_EQ(compacted_stores.size(), num_stores);
ASSERT_EQ(compacted_blocks.size(), num_stores);
ASSERT_TRUE(is_sorted(results.begin(), results.end()));
}
void BetweenZeroAndOne(double to_check) {
ASSERT_LT(0, to_check);
ASSERT_GT(1, to_check);
}
TEST_F(TestRowSet, TestCompactStores) {
// With this setting, we want major compactions to basically always have a score.
FLAGS_tablet_delta_store_major_compact_min_ratio = 0.0001;
// With this setting, the perf improvement will be 0 until we have two files, at which point
// it will be the expected ratio, then with three files we get the maximum improvement.
FLAGS_tablet_delta_store_minor_compact_max = 3;
// Turning this off so that we can call DeltaStoresCompactionPerfImprovementScore without having
// to open the files after creating them.
FLAGS_cfile_lazy_open = false;
WriteTestRowSet();
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
// Write a first delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas(nullptr));
// One file isn't enough for minor compactions, but a major compaction can run.
ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
RowSet::MAJOR_DELTA_COMPACTION)));
// Write a second delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas(nullptr));
// Two files is enough for all delta compactions.
NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
RowSet::MINOR_DELTA_COMPACTION)));
NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
RowSet::MAJOR_DELTA_COMPACTION)));
// Write a third delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas(nullptr));
// We're hitting the max for minor compactions but not for major compactions.
ASSERT_EQ(1, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
RowSet::MAJOR_DELTA_COMPACTION)));
// Compact the deltafiles
DeltaTracker *dt = rs->delta_tracker();
int num_stores = dt->redo_delta_stores_.size();
VLOG(1) << "Number of stores before compaction: " << num_stores;
ASSERT_EQ(num_stores, 3);
ASSERT_OK(dt->CompactStores(nullptr, 0, num_stores - 1));
num_stores = dt->redo_delta_stores_.size();
VLOG(1) << "Number of stores after compaction: " << num_stores;
ASSERT_EQ(1, num_stores);
// Back to one store, can't minor compact.
ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
RowSet::MAJOR_DELTA_COMPACTION)));
// Verify that the resulting deltafile is valid
vector<shared_ptr<DeltaStore> > compacted_stores;
vector<BlockId> compacted_blocks;
unique_ptr<DeltaIterator> merge_iter;
ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(nullptr, 0, num_stores - 1, &schema_,
&compacted_stores,
&compacted_blocks, &merge_iter));
vector<string> results;
ASSERT_OK(DebugDumpDeltaIterator(REDO, merge_iter.get(), schema_,
ITERATE_OVER_ALL_ROWS,
&results));
for (const string &str : results) {
VLOG(1) << str;
}
ASSERT_TRUE(is_sorted(results.begin(), results.end()));
}
TEST_F(TestRowSet, TestGCAncientStores) {
// Disable lazy open so that major delta compactions don't require manual REDO initialization.
FLAGS_cfile_lazy_open = false;
// Write some base data.
// Note: Our test methods here don't write UNDO delete deltas.
WriteTestRowSet();
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
DeltaTracker *dt = rs->delta_tracker();
ASSERT_EQ(0, dt->CountUndoDeltaStores());
ASSERT_EQ(0, dt->CountRedoDeltaStores());
// Write and flush a new REDO delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas(nullptr));
ASSERT_EQ(0, dt->CountUndoDeltaStores());
ASSERT_EQ(1, dt->CountRedoDeltaStores());
// Convert the REDO delta to an UNDO delta.
ASSERT_OK(rs->MajorCompactDeltaStores(nullptr, HistoryGcOpts::Disabled()));
ASSERT_EQ(1, dt->CountUndoDeltaStores()); // From doing the major delta compaction.
ASSERT_EQ(0, dt->CountRedoDeltaStores());
// Delete all the UNDO deltas. There shouldn't be any delta stores left.
int64_t blocks_deleted;
int64_t bytes_deleted;
ASSERT_OK(dt->DeleteAncientUndoDeltas(clock_.Now(), nullptr, &blocks_deleted, &bytes_deleted));
ASSERT_GT(blocks_deleted, 0);
ASSERT_GT(bytes_deleted, 0);
ASSERT_EQ(0, dt->CountUndoDeltaStores());
ASSERT_EQ(0, dt->CountRedoDeltaStores());
}
TEST_F(TestRowSet, TestDiskSizeEstimation) {
// Force the files to be opened so the stats are read.
FLAGS_cfile_lazy_open = false;
// Write a rowset.
WriteTestRowSet();
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Write a first delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas(nullptr));
// The rowset consists of the cfile set and REDO deltas, so the rowset's
// on-disk size and the sum of the cfile set and REDO sizes should equal.
DiskRowSetSpace drss;
rs->GetDiskRowSetSpaceUsage(&drss);
ASSERT_GT(drss.redo_deltas_size, 0);
ASSERT_EQ(rs->OnDiskSize(), drss.CFileSetOnDiskSize() + drss.redo_deltas_size);
// Convert the REDO delta to an UNDO delta.
ASSERT_OK(rs->MajorCompactDeltaStores(nullptr, HistoryGcOpts::Disabled()));
// REDO size should be zero, but there should be UNDOs, so the on-disk size
// of the rowset should be the sum of the cfile set and UNDO sizes.
rs->GetDiskRowSetSpaceUsage(&drss);
ASSERT_GT(drss.undo_deltas_size, 0);
ASSERT_EQ(rs->OnDiskSize(), drss.CFileSetOnDiskSize() + drss.undo_deltas_size);
// Write a second delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas(nullptr));
// Now there's base data, REDOs, and UNDOs.
rs->GetDiskRowSetSpaceUsage(&drss);
ASSERT_GT(drss.undo_deltas_size, 0);
ASSERT_GT(drss.redo_deltas_size, 0);
}
class DiffScanRowSetTest : public KuduRowSetTest,
public ::testing::WithParamInterface<tuple<bool, bool>> {
public:
DiffScanRowSetTest()
: KuduRowSetTest(CreateTestSchema()),
op_id_(consensus::MaximumOpId()),
clock_(Timestamp::kInitialTimestamp) {
}
protected:
static Schema CreateTestSchema() {
SchemaBuilder builder;
CHECK_OK(builder.AddKeyColumn("key", UINT32));
CHECK_OK(builder.AddColumn("val1", UINT32));
CHECK_OK(builder.AddColumn("val2", UINT32));
return builder.BuildWithoutIds();
}
consensus::OpId op_id_;
clock::LogicalClock clock_;
MvccManager mvcc_;
};
// Tests the Cartesian product of two boolean parameters:
// 1. Whether to include deleted rows in the scan.
// 2. Whether to include the "is deleted" virtual column in the scan's projection.
INSTANTIATE_TEST_SUITE_P(RowIteratorOptionsPermutations, DiffScanRowSetTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
// Tests diff scans on a diskrowset. The rowset is generated with only a handful
// of rows, but we randomly flush/compact between each update operation so that
// the test operates on a variety of different on-disk and in-memory layouts.
TEST_P(DiffScanRowSetTest, TestFuzz) {
fs::IOContext test_context;
scoped_refptr<log::LogAnchorRegistry> log_anchor_registry(new log::LogAnchorRegistry());
// Create and open a DRS with four rows.
shared_ptr<DiskRowSet> rs;
{
DiskRowSetWriter drsw(rowset_meta_.get(), &schema_,
BloomFilterSizing::BySizeAndFPRate(32 * 1024, 0.01f));
ASSERT_OK(drsw.Open());
RowBuilder rb(&schema_);
for (int i = 0; i < 4; i++) {
rb.Reset();
rb.AddUint32(i);
rb.AddUint32(i * 2);
rb.AddUint32(i * 3);
ASSERT_OK(WriteRow(rb.data(), &drsw));
}
ASSERT_OK(drsw.Finish());
ASSERT_OK(DiskRowSet::Open(rowset_meta_, log_anchor_registry.get(),
TabletMemTrackers(), &test_context, &rs));
}
// Run the diff scan test. Scan time boundaries are given by 'ts1_val' and
// 'ts2_val'. The expected results are given by 'expected_rows'.
using RowTuple = tuple<uint32_t, uint32_t, uint32_t, bool>;
auto run_test = [&](uint64_t ts1_val,
uint64_t ts2_val,
vector<RowTuple> expected_rows) {
bool include_deleted_rows = std::get<0>(GetParam());
bool add_vc_is_deleted = std::get<1>(GetParam());
// Create a projection of the schema, adding the IS_DELETED virtual
// column if desired.
vector<ColumnSchema> col_schemas;
vector<ColumnId> col_ids;
for (int i = 0; i < schema_.num_columns(); i++) {
col_schemas.push_back(schema_.column(i));
col_ids.push_back(schema_.column_id(i));
}
if (add_vc_is_deleted) {
bool read_default = false;
col_schemas.emplace_back("is_deleted", IS_DELETED, /*is_nullable=*/ false,
/*is_immutable=*/ false, &read_default);
col_ids.emplace_back(schema_.max_col_id() + 1);
}
Schema projection(col_schemas, col_ids, 1);
// Set up the iterator.
Timestamp ts1(ts1_val);
Timestamp ts2(ts2_val);
SCOPED_TRACE(Substitute("Scanning at $0,$1 with schema $2",
ts1.ToString(), ts2.ToString(), projection.ToString()));
RowIteratorOptions opts;
opts.include_deleted_rows = include_deleted_rows;
opts.projection = &projection;
opts.snap_to_exclude = MvccSnapshot(ts1);
opts.snap_to_include = MvccSnapshot(ts2);
unique_ptr<RowwiseIterator> iter;
ASSERT_OK(rs->NewRowIterator(opts, &iter));
ASSERT_OK(iter->Init(nullptr));
// Scan the data out of the iterator and test it.
vector<string> lines;
ASSERT_OK(IterateToStringList(iter.get(), &lines));
if (!include_deleted_rows) {
vector<RowTuple> without_deleted_rows;
for (const auto& e : expected_rows) {
if (!std::get<3>(e)) {
without_deleted_rows.emplace_back(e);
}
}
expected_rows = without_deleted_rows;
}
ASSERT_EQ(expected_rows.size(), lines.size()) << lines;
for (int i = 0; i < expected_rows.size(); i++) {
string expected_is_deleted = add_vc_is_deleted ? Substitute(
", is_deleted is_deleted=$0", std::get<3>(expected_rows[i])) : "";
string expected_line = Substitute("(uint32 key=$0, "
"uint32 val1=$1, "
"uint32 val2=$2$3)",
std::get<0>(expected_rows[i]),
std::get<1>(expected_rows[i]),
std::get<2>(expected_rows[i]),
expected_is_deleted);
ASSERT_EQ(expected_line, lines[i]);
}
};
// Mutate a particular column in the diskrowset given by 'row_idx' and
// 'col_idx'. If 'val' is unset, deletes the row.
auto mutate_row = [&](rowid_t row_idx,
size_t col_idx,
std::optional<uint32_t> val) {
// Build the mutation.
faststring buf;
RowChangeListEncoder enc(&buf);
if (val) {
enc.AddColumnUpdate(schema_.column(col_idx),
schema_.column_id(col_idx),
&(*val));
} else {
enc.SetToDelete();
}
// Build the row key.
Schema proj_key = schema_.CreateKeyProjection();
RowBuilder rb(&proj_key);
rb.AddUint32(row_idx);
Arena arena(64);
RowSetKeyProbe probe(rb.row(), &arena);
// Apply the mutation.
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
ProbeStats stats;
OperationResultPB result;
ASSERT_OK(rs->MutateRow(op.timestamp(), probe, enc.as_changelist(), op_id_,
&test_context, &stats, &result));
op.FinishApplying();
};
Random prng(SeedRandom());
// Possibly apply a randomly chosen delta flush or compaction operation.
auto maybe_flush_compact = [&]() {
// 25% for a minor delta compaction.
// 25% for a major delta compaction.
// 50% for a DMS flush.
int r = prng.Uniform(4);
if (r == 0) {
ASSERT_OK(rs->MinorCompactDeltaStores(&test_context));
} else if (r == 1) {
ASSERT_OK(rs->MajorCompactDeltaStores(&test_context, HistoryGcOpts::Disabled()));
} else {
ASSERT_OK(rs->FlushDeltas(&test_context));
}
};
// Update the rows in the diskrowset.
NO_FATALS(maybe_flush_compact());
NO_FATALS(mutate_row(1, 2, 1000)); // ts 1
NO_FATALS(maybe_flush_compact());
NO_FATALS(mutate_row(1, 1, 200)); // ts 2
NO_FATALS(maybe_flush_compact());
NO_FATALS(mutate_row(2, 1, 300)); // ts 3
NO_FATALS(maybe_flush_compact());
NO_FATALS(mutate_row(3, 1, 400)); // ts 4
NO_FATALS(maybe_flush_compact());
NO_FATALS(mutate_row(3, 1, std::nullopt)); // ts 5
NO_FATALS(maybe_flush_compact());
// Run the diff scan test on every permutation of time bounds.
//
// Note: a regular DRS would include an UNDO DELETE for every INSERT by virtue
// of having been flushed from an MRS. That's not the case here, and as a
// result, the insertions themselves aren't associated with any timestamps and
// thus are not captured by these diff scans.
// The time bounds specify an empty range so nothing is captured.
for (int i = 0; i < 7; i++) {
NO_FATALS(run_test(i, i, {}));
}
NO_FATALS(run_test(0, 1, {}));
NO_FATALS(run_test(1, 2, { make_tuple(1, 2, 1000, false) }));
NO_FATALS(run_test(2, 3, { make_tuple(1, 200, 1000, false) }));
NO_FATALS(run_test(3, 4, { make_tuple(2, 300, 6, false) }));
NO_FATALS(run_test(4, 5, { make_tuple(3, 400, 9, false) }));
NO_FATALS(run_test(5, 6, { make_tuple(3, 400, 9, true) }));
NO_FATALS(run_test(0, 2, { make_tuple(1, 2, 1000, false) }));
NO_FATALS(run_test(1, 3, { make_tuple(1, 200, 1000, false) }));
NO_FATALS(run_test(2, 4, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false) }));
NO_FATALS(run_test(3, 5, { make_tuple(2, 300, 6, false),
make_tuple(3, 400, 9, false) }));
NO_FATALS(run_test(4, 6, { make_tuple(3, 400, 9, true) }));
NO_FATALS(run_test(0, 3, { make_tuple(1, 200, 1000, false) }));
NO_FATALS(run_test(1, 4, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false) }));
NO_FATALS(run_test(2, 5, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false),
make_tuple(3, 400, 9, false) }));
NO_FATALS(run_test(3, 6, { make_tuple(2, 300, 6, false),
make_tuple(3, 400, 9, true) }));
NO_FATALS(run_test(0, 4, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false) }));
NO_FATALS(run_test(1, 5, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false),
make_tuple(3, 400, 9, false) }));
NO_FATALS(run_test(2, 6, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false),
make_tuple(3, 400, 9, true) }));
NO_FATALS(run_test(0, 5, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false),
make_tuple(3, 400, 9, false) }));
NO_FATALS(run_test(1, 6, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false),
make_tuple(3, 400, 9, true) }));
NO_FATALS(run_test(0, 6, { make_tuple(1, 200, 1000, false),
make_tuple(2, 300, 6, false),
make_tuple(3, 400, 9, true) }));
}
} // namespace tablet
} // namespace kudu