blob: 9cbeda9806053b1692488e25d48407cd2b29a35d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/deltamemstore.h"
#include <algorithm>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/clock/logical_clock.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/util/faststring.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DEFINE_int32(benchmark_num_passes,
#ifdef NDEBUG
100,
#else
1,
#endif
"Number of passes to apply deltas in the benchmark");
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace tablet {
using fs::WritableBlock;
class TestDeltaMemStore : public KuduTest {
public:
TestDeltaMemStore()
: op_id_(consensus::MaximumOpId()),
schema_(CreateSchema()),
clock_(Timestamp::kInitialTimestamp) {
CHECK_OK(DeltaMemStore::Create(0, 0,
new log::LogAnchorRegistry(),
MemTracker::GetRootTracker(), &dms_));
CHECK_OK(dms_->Init(nullptr));
}
static Schema CreateSchema() {
SchemaBuilder builder;
CHECK_OK(builder.AddColumn("col1", STRING));
CHECK_OK(builder.AddColumn("col2", STRING));
CHECK_OK(builder.AddColumn("col3", UINT32));
return builder.Build();
}
template<class Iterable>
void UpdateIntsAtIndexes(const Iterable &indexes_to_update) {
faststring buf;
RowChangeListEncoder update(&buf);
for (uint32_t idx_to_update : indexes_to_update) {
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
update.Reset();
uint32_t new_val = idx_to_update * 10;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &new_val);
CHECK_OK(dms_->Update(op.timestamp(), idx_to_update, RowChangeList(buf), op_id_));
op.FinishApplying();
}
}
void ApplyUpdates(const MvccSnapshot &snapshot,
uint32_t row_idx,
size_t col_idx,
ColumnBlock *cb) {
ColumnSchema col_schema(schema_.column(col_idx));
Schema single_col_projection({ col_schema },
{ schema_.column_id(col_idx) },
0);
RowIteratorOptions opts;
opts.projection = &single_col_projection;
opts.snap_to_include = snapshot;
unique_ptr<DeltaIterator> iter;
Status s = dms_->NewDeltaIterator(opts, &iter);
if (s.IsNotFound()) {
return;
}
ASSERT_OK(s);
ASSERT_OK(iter->Init(nullptr));
ASSERT_OK(iter->SeekToOrdinal(row_idx));
ASSERT_OK(iter->PrepareBatch(cb->nrows(), DeltaIterator::PREPARE_FOR_APPLY));
SelectionVector filter(cb->nrows());
filter.SetAllTrue();
ASSERT_OK(iter->ApplyDeletes(&filter));
ASSERT_OK(iter->ApplyUpdates(0, cb, filter));
}
protected:
static const int kStringColumn = 1;
static const int kIntColumn = 2;
consensus::OpId op_id_;
const Schema schema_;
shared_ptr<DeltaMemStore> dms_;
clock::LogicalClock clock_;
MvccManager mvcc_;
};
static void GenerateRandomIndexes(uint32_t range, uint32_t count,
unordered_set<uint32_t> *out) {
CHECK_LE(count, range / 2) <<
"this will be too slow unless count is much smaller than range";
out->clear();
for (int i = 0; i < count; i++) {
bool inserted = false;
do {
inserted = out->insert(random() % range).second;
} while (!inserted);
}
}
TEST_F(TestDeltaMemStore, TestUpdateCount) {
uint32_t n_rows = 1000;
faststring update_buf;
RowChangeListEncoder update(&update_buf);
for (uint32_t idx = 0; idx < n_rows; idx++) {
update.Reset();
if (idx % 4 == 0) {
char buf[256] = "update buf";
Slice s(buf);
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
}
if (idx % 2 == 0) {
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
uint32_t new_val = idx * 10;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &new_val);
ASSERT_OK_FAST(dms_->Update(op.timestamp(), idx, RowChangeList(update_buf), op_id_));
op.FinishApplying();
}
}
// Flush the delta file so that the stats get updated.
FsManager fs(env_, FsManagerOpts(GetTestPath("fs_root")));
ASSERT_OK(fs.CreateInitialFileSystemLayout());
ASSERT_OK(fs.Open());
unique_ptr<WritableBlock> block;
ASSERT_OK(fs.CreateNewBlock({}, &block));
DeltaFileWriter dfw(std::move(block));
ASSERT_OK(dfw.Start());
dms_->FlushToFile(&dfw);
unique_ptr<DeltaStats> stats = dfw.release_delta_stats();
ASSERT_EQ(n_rows / 2, stats->update_count_for_col_id(schema_.column_id(kIntColumn)));
ASSERT_EQ(n_rows / 4, stats->update_count_for_col_id(schema_.column_id(kStringColumn)));
}
TEST_F(TestDeltaMemStore, TestDMSSparseUpdates) {
int n_rows = 1000;
// Update 100 random rows out of the 1000.
srand(12345);
unordered_set<uint32_t> indexes_to_update;
GenerateRandomIndexes(n_rows, 100, &indexes_to_update);
UpdateIntsAtIndexes(indexes_to_update);
ASSERT_EQ(100, dms_->Count());
// Now apply the updates from the DMS back to an array
ScopedColumnBlock<UINT32> read_back(1000);
for (int i = 0; i < 1000; i++) {
read_back[i] = 0xDEADBEEF;
}
MvccSnapshot snap(mvcc_);
ApplyUpdates(snap, 0, kIntColumn, &read_back);
// And verify that only the rows that we updated are modified within
// the array.
for (int i = 0; i < 1000; i++) {
// If this wasn't one of the ones we updated, expect our marker
if (indexes_to_update.find(i) == indexes_to_update.end()) {
// If this wasn't one of the ones we updated, expect our marker
ASSERT_EQ(0xDEADBEEF, read_back[i]);
} else {
// Otherwise expect the updated value
ASSERT_EQ(i * 10, read_back[i]);
}
}
}
// Performance test for KUDU-749: zipfian workloads can cause a lot
// of updates to a single row. This benchmark updates a single row many
// times and times how long it takes to apply those updates during
// the read path.
TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
const int kNumRows = 1000;
const int kNumUpdates = 10000;
const int kIdxToUpdate = 10;
const int kStringDataSize = 1000;
for (int i = 0; i < kNumUpdates; i++) {
faststring buf;
RowChangeListEncoder update(&buf);
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
string str(kStringDataSize, 'x');
Slice s(str);
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
CHECK_OK(dms_->Update(op.timestamp(), kIdxToUpdate, RowChangeList(buf), op_id_));
op.FinishApplying();
}
mvcc_.AdjustNewOpLowerBound(clock_.Now());
MvccSnapshot snap(mvcc_);
LOG_TIMING(INFO, "Applying updates") {
for (int i = 0; i < FLAGS_benchmark_num_passes; i++) {
ScopedColumnBlock<STRING> strings(kNumRows);
for (int i = 0; i < kNumRows; i++) {
strings[i] = Slice();
}
ApplyUpdates(snap, 0, kStringColumn, &strings);
}
}
}
class TestDeltaMemStoreNumUpdates : public TestDeltaMemStore,
public ::testing::WithParamInterface<int> {
};
INSTANTIATE_TEST_SUITE_P(DifferentNumUpdates,
TestDeltaMemStoreNumUpdates, ::testing::Values(2, 20, 200));
TEST_P(TestDeltaMemStoreNumUpdates, BenchmarkSnapshotScans) {
const int kNumRows = 100;
// Populate the DMS with kNumRows * GetParam() updates. For each row, every
// update is at a different timestamp.
faststring buf;
RowChangeListEncoder update(&buf);
LOG_TIMING(INFO, Substitute("updating $0 rows $1 times each", kNumRows, GetParam())) {
for (rowid_t row_idx = 0; row_idx < kNumRows; row_idx++) {
for (int ts_val = 0; ts_val < GetParam(); ts_val++) {
update.Reset();
Timestamp ts(ts_val);
uint32_t new_val = ts_val;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &new_val);
CHECK_OK(dms_->Update(ts, row_idx, RowChangeList(buf), op_id_));
}
}
}
// Now scan the DMS at each timestamp. The scans are repeated in a number of
// passes to stabilize the results.
ScopedColumnBlock<UINT32> ints(kNumRows);
LOG_TIMING(INFO, Substitute("running $0 scans for each timestamp",
FLAGS_benchmark_num_passes)) {
for (int ts_val = 0; ts_val < GetParam(); ts_val++) {
LOG_TIMING(INFO, Substitute("running $0 scans at timestamp $1",
FLAGS_benchmark_num_passes, ts_val)) {
for (int pass = 0; pass < FLAGS_benchmark_num_passes; pass++) {
Timestamp ts(ts_val);
MvccSnapshot snap(ts);
NO_FATALS(ApplyUpdates(snap, 0, kIntColumn, &ints));
}
}
}
}
}
class TestDeltaMemStoreNumDeletes : public TestDeltaMemStore,
public ::testing::WithParamInterface<int> {
};
INSTANTIATE_TEST_SUITE_P(DifferentNumDeletes,
TestDeltaMemStoreNumDeletes, ::testing::Values(0, 10, 100, 1000));
TEST_P(TestDeltaMemStoreNumDeletes, BenchmarkScansWithVaryingNumberOfDeletes) {
const int kNumUpdates = 10000;
// Populate the DMS with kNumRows updates, one to each row.
faststring buf;
RowChangeListEncoder update(&buf);
rowid_t delete_stepping = GetParam() != 0 ? kNumUpdates / GetParam() : 0;
for (rowid_t row_idx = 0; row_idx < kNumUpdates; row_idx++) {
update.Reset();
uint32_t new_val = row_idx;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &new_val);
ASSERT_OK(dms_->Update(Timestamp(row_idx), row_idx, RowChangeList(buf), op_id_));
// When appropriate, add a DELETE too.
if (delete_stepping != 0 && row_idx % delete_stepping == 0) {
update.Reset();
update.SetToDelete();
ASSERT_OK(dms_->Update(Timestamp(row_idx + 1), row_idx, RowChangeList(buf), op_id_));
}
}
// Now scan the DMS. The scans are repeated in a number of passes to stabilize
// the results.
ScopedColumnBlock<UINT32> ints(kNumUpdates);
MvccSnapshot snap(MvccSnapshot::CreateSnapshotIncludingAllOps());
LOG_TIMING(INFO, Substitute("running $0 scans with $1 deletes",
FLAGS_benchmark_num_passes, GetParam())) {
for (int pass = 0; pass < FLAGS_benchmark_num_passes; pass++) {
NO_FATALS(ApplyUpdates(snap, 0, kIntColumn, &ints));
}
}
}
// Test when a slice column has been updated multiple times in the
// memrowset that the referred to values properly end up in the
// right arena.
TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
faststring update_buf;
RowChangeListEncoder update(&update_buf);
// Update a cell, taking care that the buffer we use to perform
// the update gets cleared after usage. This ensures that the
// underlying data is properly copied into the DMS arena.
{
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
char buf[256] = "update 1";
Slice s(buf);
update.AddColumnUpdate(schema_.column(0),
schema_.column_id(0), &s);
ASSERT_OK_FAST(dms_->Update(op.timestamp(), 123, RowChangeList(update_buf), op_id_));
memset(buf, 0xff, sizeof(buf));
op.FinishApplying();
}
MvccSnapshot snapshot_after_first_update(mvcc_);
// Update the same cell again with a different value
{
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
char buf[256] = "update 2";
Slice s(buf);
update.Reset();
update.AddColumnUpdate(schema_.column(0),
schema_.column_id(0), &s);
ASSERT_OK_FAST(dms_->Update(op.timestamp(), 123, RowChangeList(update_buf), op_id_));
memset(buf, 0xff, sizeof(buf));
op.FinishApplying();
}
MvccSnapshot snapshot_after_second_update(mvcc_);
// Ensure we end up with a second entry for the cell, at the
// new timestamp
ASSERT_EQ(2, dms_->Count());
// Ensure that we ended up with the right data, and that the old MVCC snapshot
// yields the correct old value.
ScopedColumnBlock<STRING> read_back(1);
ApplyUpdates(snapshot_after_first_update, 123, 0, &read_back);
ASSERT_EQ("update 1", read_back[0].ToString());
ApplyUpdates(snapshot_after_second_update, 123, 0, &read_back);
ASSERT_EQ("update 2", read_back[0].ToString());
}
// Test that if two updates come in with out-of-order op timestamps,
// the one with the higher op timestamp ends up winning.
//
// This is important during flushing when updates against the old rowset
// are carried forward, but may fall behind newer ops.
TEST_F(TestDeltaMemStore, TestOutOfOrderOps) {
faststring update_buf;
RowChangeListEncoder update(&update_buf);
{
ScopedOp op1(&mvcc_, clock_.Now());
ScopedOp op2(&mvcc_, clock_.Now());
op2.StartApplying();
Slice s("update 2");
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
ASSERT_OK(dms_->Update(op2.timestamp(), 123, RowChangeList(update_buf), op_id_));
op2.FinishApplying();
op1.StartApplying();
update.Reset();
s = Slice("update 1");
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
ASSERT_OK(dms_->Update(op1.timestamp(), 123, RowChangeList(update_buf), op_id_));
op1.FinishApplying();
}
// Ensure we end up two entries for the cell.
ASSERT_EQ(2, dms_->Count());
// Ensure that we ended up with the right data.
ScopedColumnBlock<STRING> read_back(1);
ApplyUpdates(MvccSnapshot(mvcc_), 123, kStringColumn, &read_back);
ASSERT_EQ("update 2", read_back[0].ToString());
}
TEST_F(TestDeltaMemStore, TestDMSBasic) {
faststring update_buf;
RowChangeListEncoder update(&update_buf);
char buf[256];
for (uint32_t i = 0; i < 1000; i++) {
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
update.Reset();
uint32_t val = i * 10;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &val);
snprintf(buf, sizeof(buf), "hello %d", i);
Slice s(buf);
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
ASSERT_OK_FAST(dms_->Update(op.timestamp(), i, RowChangeList(update_buf), op_id_));
op.FinishApplying();
}
ASSERT_EQ(1000, dms_->Count());
// Read back the values and check correctness.
MvccSnapshot snap(mvcc_);
ScopedColumnBlock<UINT32> read_back(1000);
ScopedColumnBlock<STRING> read_back_slices(1000);
ApplyUpdates(snap, 0, kIntColumn, &read_back);
ApplyUpdates(snap, 0, kStringColumn, &read_back_slices);
// When reading back the slice, do so into a different buffer -
// otherwise if the slice references weren't properly copied above,
// we'd be writing our comparison value into the same buffer that
// we're comparing against!
char buf2[256];
for (uint32_t i = 0; i < 1000; i++) {
ASSERT_EQ(i * 10, read_back[i]) << "failed at iteration " << i;
snprintf(buf2, sizeof(buf2), "hello %d", i);
Slice s(buf2);
ASSERT_EQ(s, read_back_slices[i]);
}
// Update the same rows again, with new ops. Even though
// the same rows are updated, new entries should be added because
// these are separate ops and we need to maintain the
// old ones for snapshot consistency purposes.
for (uint32_t i = 0; i < 1000; i++) {
ScopedOp op(&mvcc_, clock_.Now());
op.StartApplying();
update.Reset();
uint32_t val = i * 20;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &val);
ASSERT_OK_FAST(dms_->Update(op.timestamp(), i, RowChangeList(update_buf), op_id_));
op.FinishApplying();
}
ASSERT_EQ(2000, dms_->Count());
}
TEST_F(TestDeltaMemStore, TestIteratorDoesUpdates) {
unordered_set<uint32_t> to_update;
for (uint32_t i = 0; i < 1000; i++) {
to_update.insert(i);
}
UpdateIntsAtIndexes(to_update);
ASSERT_EQ(1000, dms_->Count());
ScopedColumnBlock<UINT32> block(100);
RowIteratorOptions opts;
opts.projection = &schema_;
// TODO(todd): test snapshot reads from different points
opts.snap_to_include = MvccSnapshot(mvcc_);
unique_ptr<DeltaIterator> iter;
Status s = dms_->NewDeltaIterator(opts, &iter);
if (s.IsNotFound()) {
FAIL() << "Iterator fell outside of the range of the snapshot";
}
ASSERT_OK(s);
ASSERT_OK(iter->Init(nullptr));
int block_start_row = 50;
ASSERT_OK(iter->SeekToOrdinal(block_start_row));
ASSERT_OK(iter->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
SelectionVector sv(block.nrows());
sv.SetAllTrue();
ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block, sv));
for (int i = 0; i < 100; i++) {
int actual_row = block_start_row + i;
ASSERT_EQ(actual_row * 10, block[i]) << "at row " << actual_row;
}
// Apply the next block
block_start_row += block.nrows();
ASSERT_OK(iter->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block, sv));
for (int i = 0; i < 100; i++) {
int actual_row = block_start_row + i;
ASSERT_EQ(actual_row * 10, block[i]) << "at row " << actual_row;
}
}
TEST_F(TestDeltaMemStore, TestCollectMutations) {
Arena arena(1024);
// Update rows 5 and 12
vector<uint32_t> to_update;
to_update.push_back(5);
to_update.push_back(12);
UpdateIntsAtIndexes(to_update);
ASSERT_EQ(2, dms_->Count());
const int kBatchSize = 10;
vector<Mutation *> mutations;
mutations.resize(kBatchSize);
RowIteratorOptions opts;
opts.projection = &schema_;
opts.snap_to_include = MvccSnapshot(mvcc_);
unique_ptr<DeltaIterator> iter;
Status s = dms_->NewDeltaIterator(opts, &iter);
if (s.IsNotFound()) {
FAIL() << "Iterator fell outside of the range of the snapshot";
}
ASSERT_OK(s);
ASSERT_OK(iter->Init(nullptr));
ASSERT_OK(iter->SeekToOrdinal(0));
ASSERT_OK(iter->PrepareBatch(kBatchSize, DeltaIterator::PREPARE_FOR_COLLECT));
ASSERT_OK(iter->CollectMutations(&mutations, &arena));
// Only row 5 is updated, everything else should be NULL.
for (int i = 0; i < kBatchSize; i++) {
string str = Mutation::StringifyMutationList(schema_, mutations[i]);
VLOG(1) << "row " << i << ": " << str;
if (i != 5) {
EXPECT_EQ("[]", str);
} else {
EXPECT_EQ("[@1(SET col3=50)]", str);
}
}
// Collect the next batch of 10.
arena.Reset();
std::fill(mutations.begin(), mutations.end(), reinterpret_cast<Mutation *>(NULL));
ASSERT_OK(iter->PrepareBatch(kBatchSize, DeltaIterator::PREPARE_FOR_COLLECT));
ASSERT_OK(iter->CollectMutations(&mutations, &arena));
// Only row 2 is updated, everything else should be NULL.
for (int i = 0; i < 10; i++) {
string str = Mutation::StringifyMutationList(schema_, mutations[i]);
VLOG(1) << "row " << i << ": " << str;
if (i != 2) {
EXPECT_EQ("[]", str);
} else {
EXPECT_EQ("[@2(SET col3=120)]", str);
}
}
}
// Generates a series of random deltas, writes them to a DMS, reads them back
// using a DMSIterator, and verifies the results.
TEST_F(TestDeltaMemStore, TestFuzz) {
// Arbitrary constants to control the running time and coverage of the test.
const int kNumColumns = 100;
const int kNumRows = 1000;
const int kNumDeltas = 10000;
const std::pair<uint64_t, uint64_t> kTimestampRange(0, 100);
// Build a schema with kNumColumns columns, some of which are nullable.
Random prng(SeedRandom());
SchemaBuilder sb;
for (int i = 0; i < kNumColumns; i++) {
if (prng.Uniform(10) == 0) {
ASSERT_OK(sb.AddNullableColumn(Substitute("col$0", i), UINT32));
} else {
ASSERT_OK(sb.AddColumn(Substitute("col$0", i), UINT32));
}
}
Schema schema(sb.Build());
MirroredDeltas<DeltaTypeSelector<REDO>> deltas(&schema);
shared_ptr<DeltaMemStore> dms;
ASSERT_OK(CreateRandomDMS(
schema, &prng, kNumDeltas, { 0, kNumRows }, kTimestampRange, &deltas, &dms));
NO_FATALS(RunDeltaFuzzTest<DeltaTypeSelector<REDO>>(
*dms, &prng, &deltas, kTimestampRange,
/*test_filter_column_ids_and_collect_deltas=*/false));
}
TEST_F(TestDeltaMemStore, TestDeletedRowCount) {
const int kNumUpdates = 10000;
faststring buf;
RowChangeListEncoder update(&buf);
for (rowid_t row_idx = 0; row_idx < kNumUpdates; row_idx++) {
// UPDATE.
uint32_t new_val = row_idx;
update.Reset();
update.AddColumnUpdate(schema_.column(kIntColumn), schema_.column_id(kIntColumn), &new_val);
ASSERT_OK(dms_->Update(Timestamp(row_idx), row_idx, RowChangeList(buf), op_id_));
// DELETE.
if (row_idx % 2 == 0) {
update.Reset();
update.SetToDelete();
ASSERT_OK(dms_->Update(Timestamp(row_idx + 1), row_idx, RowChangeList(buf), op_id_));
}
}
ASSERT_EQ(kNumUpdates / 2, dms_->deleted_row_count());
}
} // namespace tablet
} // namespace kudu