// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/tablet/deltamemstore.h"

#include <algorithm>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/clock/logical_clock.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/util/faststring.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DEFINE_int32(benchmark_num_passes,
#ifdef NDEBUG
             100,
#else
             1,
#endif
             "Number of passes to apply deltas in the benchmark");

using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;

namespace kudu {
namespace tablet {

using fs::WritableBlock;

class TestDeltaMemStore : public KuduTest {
 public:
  TestDeltaMemStore()
      : op_id_(consensus::MaximumOpId()),
        schema_(CreateSchema()),
        clock_(Timestamp::kInitialTimestamp) {
    CHECK_OK(DeltaMemStore::Create(0, 0,
                                   new log::LogAnchorRegistry(),
                                   MemTracker::GetRootTracker(), &dms_));
    CHECK_OK(dms_->Init(nullptr));
  }

  static Schema CreateSchema() {
    SchemaBuilder builder;
    CHECK_OK(builder.AddColumn("col1", STRING));
    CHECK_OK(builder.AddColumn("col2", STRING));
    CHECK_OK(builder.AddColumn("col3", UINT32));
    return builder.Build();
  }

  template<class Iterable>
  void UpdateIntsAtIndexes(const Iterable &indexes_to_update) {
    faststring buf;
    RowChangeListEncoder update(&buf);

    for (uint32_t idx_to_update : indexes_to_update) {
      ScopedOp op(&mvcc_, clock_.Now());
      op.StartApplying();
      update.Reset();
      uint32_t new_val = idx_to_update * 10;
      update.AddColumnUpdate(schema_.column(kIntColumn),
                             schema_.column_id(kIntColumn), &new_val);

      CHECK_OK(dms_->Update(op.timestamp(), idx_to_update, RowChangeList(buf), op_id_));
      op.FinishApplying();
    }
  }

  void ApplyUpdates(const MvccSnapshot &snapshot,
                    uint32_t row_idx,
                    size_t col_idx,
                    ColumnBlock *cb) {
    ColumnSchema col_schema(schema_.column(col_idx));
    Schema single_col_projection({ col_schema },
                                 { schema_.column_id(col_idx) },
                                 0);
    RowIteratorOptions opts;
    opts.projection = &single_col_projection;
    opts.snap_to_include = snapshot;
    unique_ptr<DeltaIterator> iter;
    Status s = dms_->NewDeltaIterator(opts, &iter);
    if (s.IsNotFound()) {
      return;
    }
    ASSERT_OK(s);
    ASSERT_OK(iter->Init(nullptr));
    ASSERT_OK(iter->SeekToOrdinal(row_idx));
    ASSERT_OK(iter->PrepareBatch(cb->nrows(), DeltaIterator::PREPARE_FOR_APPLY));
    SelectionVector filter(cb->nrows());
    filter.SetAllTrue();
    ASSERT_OK(iter->ApplyDeletes(&filter));
    ASSERT_OK(iter->ApplyUpdates(0, cb, filter));
  }

 protected:
  static const int kStringColumn = 1;
  static const int kIntColumn = 2;

  consensus::OpId op_id_;

  const Schema schema_;
  shared_ptr<DeltaMemStore> dms_;
  clock::LogicalClock clock_;
  MvccManager mvcc_;
};

static void GenerateRandomIndexes(uint32_t range, uint32_t count,
                                  unordered_set<uint32_t> *out) {
  CHECK_LE(count, range / 2) <<
    "this will be too slow unless count is much smaller than range";
  out->clear();

  for (int i = 0; i < count; i++) {
    bool inserted = false;
    do {
      inserted = out->insert(random() % range).second;
    } while (!inserted);
  }
}

TEST_F(TestDeltaMemStore, TestUpdateCount) {
  uint32_t n_rows = 1000;
  faststring update_buf;

  RowChangeListEncoder update(&update_buf);
  for (uint32_t idx = 0; idx < n_rows; idx++) {
    update.Reset();
    if (idx % 4 == 0) {
      char buf[256] = "update buf";
      Slice s(buf);
      update.AddColumnUpdate(schema_.column(kStringColumn),
                             schema_.column_id(kStringColumn), &s);
    }
    if (idx % 2 == 0) {
      ScopedOp op(&mvcc_, clock_.Now());
      op.StartApplying();
      uint32_t new_val = idx * 10;
      update.AddColumnUpdate(schema_.column(kIntColumn),
                             schema_.column_id(kIntColumn), &new_val);
      ASSERT_OK_FAST(dms_->Update(op.timestamp(), idx, RowChangeList(update_buf), op_id_));
      op.FinishApplying();
    }
  }


  // Flush the delta file so that the stats get updated.

  FsManager fs(env_, FsManagerOpts(GetTestPath("fs_root")));
  ASSERT_OK(fs.CreateInitialFileSystemLayout());
  ASSERT_OK(fs.Open());
  unique_ptr<WritableBlock> block;
  ASSERT_OK(fs.CreateNewBlock({}, &block));
  DeltaFileWriter dfw(std::move(block));
  ASSERT_OK(dfw.Start());
  dms_->FlushToFile(&dfw);
  unique_ptr<DeltaStats> stats = dfw.release_delta_stats();

  ASSERT_EQ(n_rows / 2, stats->update_count_for_col_id(schema_.column_id(kIntColumn)));
  ASSERT_EQ(n_rows / 4, stats->update_count_for_col_id(schema_.column_id(kStringColumn)));
}

TEST_F(TestDeltaMemStore, TestDMSSparseUpdates) {

  int n_rows = 1000;

  // Update 100 random rows out of the 1000.
  srand(12345);
  unordered_set<uint32_t> indexes_to_update;
  GenerateRandomIndexes(n_rows, 100, &indexes_to_update);
  UpdateIntsAtIndexes(indexes_to_update);
  ASSERT_EQ(100, dms_->Count());

  // Now apply the updates from the DMS back to an array
  ScopedColumnBlock<UINT32> read_back(1000);
  for (int i = 0; i < 1000; i++) {
    read_back[i] = 0xDEADBEEF;
  }
  MvccSnapshot snap(mvcc_);
  ApplyUpdates(snap, 0, kIntColumn, &read_back);

  // And verify that only the rows that we updated are modified within
  // the array.
  for (int i = 0; i < 1000; i++) {
    // If this wasn't one of the ones we updated, expect our marker
    if (indexes_to_update.find(i) == indexes_to_update.end()) {
      // If this wasn't one of the ones we updated, expect our marker
      ASSERT_EQ(0xDEADBEEF, read_back[i]);
    } else {
      // Otherwise expect the updated value
      ASSERT_EQ(i * 10, read_back[i]);
    }
  }
}

// Performance test for KUDU-749: zipfian workloads can cause a lot
// of updates to a single row. This benchmark updates a single row many
// times and times how long it takes to apply those updates during
// the read path.
TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
  const int kNumRows = 1000;
  const int kNumUpdates = 10000;
  const int kIdxToUpdate = 10;
  const int kStringDataSize = 1000;

  for (int i = 0; i < kNumUpdates; i++) {
    faststring buf;
    RowChangeListEncoder update(&buf);

    ScopedOp op(&mvcc_, clock_.Now());
    op.StartApplying();
    string str(kStringDataSize, 'x');
    Slice s(str);
    update.AddColumnUpdate(schema_.column(kStringColumn),
                           schema_.column_id(kStringColumn), &s);
    CHECK_OK(dms_->Update(op.timestamp(), kIdxToUpdate, RowChangeList(buf), op_id_));
    op.FinishApplying();
  }
  mvcc_.AdjustNewOpLowerBound(clock_.Now());

  MvccSnapshot snap(mvcc_);
  LOG_TIMING(INFO, "Applying updates") {
    for (int i = 0; i < FLAGS_benchmark_num_passes; i++) {
      ScopedColumnBlock<STRING> strings(kNumRows);
      for (int i = 0; i < kNumRows; i++) {
        strings[i] = Slice();
      }

      ApplyUpdates(snap, 0, kStringColumn, &strings);
    }
  }
}

class TestDeltaMemStoreNumUpdates : public TestDeltaMemStore,
                                    public ::testing::WithParamInterface<int> {
};

INSTANTIATE_TEST_SUITE_P(DifferentNumUpdates,
                         TestDeltaMemStoreNumUpdates, ::testing::Values(2, 20, 200));

TEST_P(TestDeltaMemStoreNumUpdates, BenchmarkSnapshotScans) {
  const int kNumRows = 100;

  // Populate the DMS with kNumRows * GetParam() updates. For each row, every
  // update is at a different timestamp.
  faststring buf;
  RowChangeListEncoder update(&buf);
  LOG_TIMING(INFO, Substitute("updating $0 rows $1 times each", kNumRows, GetParam())) {
    for (rowid_t row_idx = 0; row_idx < kNumRows; row_idx++) {
      for (int ts_val = 0; ts_val < GetParam(); ts_val++) {
        update.Reset();

        Timestamp ts(ts_val);
        uint32_t new_val = ts_val;
        update.AddColumnUpdate(schema_.column(kIntColumn),
                               schema_.column_id(kIntColumn), &new_val);
        CHECK_OK(dms_->Update(ts, row_idx, RowChangeList(buf), op_id_));
      }
    }
  }

  // Now scan the DMS at each timestamp. The scans are repeated in a number of
  // passes to stabilize the results.
  ScopedColumnBlock<UINT32> ints(kNumRows);
  LOG_TIMING(INFO, Substitute("running $0 scans for each timestamp",
                              FLAGS_benchmark_num_passes)) {
    for (int ts_val = 0; ts_val < GetParam(); ts_val++) {
      LOG_TIMING(INFO, Substitute("running $0 scans at timestamp $1",
                                  FLAGS_benchmark_num_passes, ts_val)) {
        for (int pass = 0; pass < FLAGS_benchmark_num_passes; pass++) {
          Timestamp ts(ts_val);
          MvccSnapshot snap(ts);
          NO_FATALS(ApplyUpdates(snap, 0, kIntColumn, &ints));
        }
      }
    }
  }
}

class TestDeltaMemStoreNumDeletes : public TestDeltaMemStore,
                                    public ::testing::WithParamInterface<int> {
};

INSTANTIATE_TEST_SUITE_P(DifferentNumDeletes,
                         TestDeltaMemStoreNumDeletes, ::testing::Values(0, 10, 100, 1000));

TEST_P(TestDeltaMemStoreNumDeletes, BenchmarkScansWithVaryingNumberOfDeletes) {
  const int kNumUpdates = 10000;

  // Populate the DMS with kNumRows updates, one to each row.
  faststring buf;
  RowChangeListEncoder update(&buf);
  rowid_t delete_stepping = GetParam() != 0 ? kNumUpdates / GetParam() : 0;
  for (rowid_t row_idx = 0; row_idx < kNumUpdates; row_idx++) {
    update.Reset();

    uint32_t new_val = row_idx;
    update.AddColumnUpdate(schema_.column(kIntColumn),
                           schema_.column_id(kIntColumn), &new_val);
    ASSERT_OK(dms_->Update(Timestamp(row_idx), row_idx, RowChangeList(buf), op_id_));

    // When appropriate, add a DELETE too.
    if (delete_stepping != 0 && row_idx % delete_stepping == 0) {
      update.Reset();
      update.SetToDelete();
      ASSERT_OK(dms_->Update(Timestamp(row_idx + 1), row_idx, RowChangeList(buf), op_id_));
    }
  }

  // Now scan the DMS. The scans are repeated in a number of passes to stabilize
  // the results.
  ScopedColumnBlock<UINT32> ints(kNumUpdates);
  MvccSnapshot snap(MvccSnapshot::CreateSnapshotIncludingAllOps());
  LOG_TIMING(INFO, Substitute("running $0 scans with $1 deletes",
                              FLAGS_benchmark_num_passes, GetParam())) {
    for (int pass = 0; pass < FLAGS_benchmark_num_passes; pass++) {
      NO_FATALS(ApplyUpdates(snap, 0, kIntColumn, &ints));
    }
  }
}

// Test when a slice column has been updated multiple times in the
// memrowset that the referred to values properly end up in the
// right arena.
TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
  faststring update_buf;
  RowChangeListEncoder update(&update_buf);

  // Update a cell, taking care that the buffer we use to perform
  // the update gets cleared after usage. This ensures that the
  // underlying data is properly copied into the DMS arena.
  {
    ScopedOp op(&mvcc_, clock_.Now());
    op.StartApplying();
    char buf[256] = "update 1";
    Slice s(buf);
    update.AddColumnUpdate(schema_.column(0),
                           schema_.column_id(0), &s);
    ASSERT_OK_FAST(dms_->Update(op.timestamp(), 123, RowChangeList(update_buf), op_id_));
    memset(buf, 0xff, sizeof(buf));
    op.FinishApplying();
  }
  MvccSnapshot snapshot_after_first_update(mvcc_);

  // Update the same cell again with a different value
  {
    ScopedOp op(&mvcc_, clock_.Now());
    op.StartApplying();
    char buf[256] = "update 2";
    Slice s(buf);
    update.Reset();
    update.AddColumnUpdate(schema_.column(0),
                           schema_.column_id(0), &s);
    ASSERT_OK_FAST(dms_->Update(op.timestamp(), 123, RowChangeList(update_buf), op_id_));
    memset(buf, 0xff, sizeof(buf));
    op.FinishApplying();
  }
  MvccSnapshot snapshot_after_second_update(mvcc_);

  // Ensure we end up with a second entry for the cell, at the
  // new timestamp
  ASSERT_EQ(2, dms_->Count());

  // Ensure that we ended up with the right data, and that the old MVCC snapshot
  // yields the correct old value.
  ScopedColumnBlock<STRING> read_back(1);
  ApplyUpdates(snapshot_after_first_update, 123, 0, &read_back);
  ASSERT_EQ("update 1", read_back[0].ToString());

  ApplyUpdates(snapshot_after_second_update, 123, 0, &read_back);
  ASSERT_EQ("update 2", read_back[0].ToString());
}

// Test that if two updates come in with out-of-order op timestamps,
// the one with the higher op timestamp ends up winning.
//
// This is important during flushing when updates against the old rowset
// are carried forward, but may fall behind newer ops.
TEST_F(TestDeltaMemStore, TestOutOfOrderOps) {
  faststring update_buf;
  RowChangeListEncoder update(&update_buf);

  {
    ScopedOp op1(&mvcc_, clock_.Now());
    ScopedOp op2(&mvcc_, clock_.Now());

    op2.StartApplying();
    Slice s("update 2");
    update.AddColumnUpdate(schema_.column(kStringColumn),
                           schema_.column_id(kStringColumn), &s);
    ASSERT_OK(dms_->Update(op2.timestamp(), 123, RowChangeList(update_buf), op_id_));
    op2.FinishApplying();


    op1.StartApplying();
    update.Reset();
    s = Slice("update 1");
    update.AddColumnUpdate(schema_.column(kStringColumn),
                           schema_.column_id(kStringColumn), &s);
    ASSERT_OK(dms_->Update(op1.timestamp(), 123, RowChangeList(update_buf), op_id_));
    op1.FinishApplying();
  }

  // Ensure we end up two entries for the cell.
  ASSERT_EQ(2, dms_->Count());

  // Ensure that we ended up with the right data.
  ScopedColumnBlock<STRING> read_back(1);
  ApplyUpdates(MvccSnapshot(mvcc_), 123, kStringColumn, &read_back);
  ASSERT_EQ("update 2", read_back[0].ToString());
}

TEST_F(TestDeltaMemStore, TestDMSBasic) {
  faststring update_buf;
  RowChangeListEncoder update(&update_buf);

  char buf[256];
  for (uint32_t i = 0; i < 1000; i++) {
    ScopedOp op(&mvcc_, clock_.Now());
    op.StartApplying();
    update.Reset();

    uint32_t val = i * 10;
    update.AddColumnUpdate(schema_.column(kIntColumn),
                           schema_.column_id(kIntColumn), &val);

    snprintf(buf, sizeof(buf), "hello %d", i);
    Slice s(buf);
    update.AddColumnUpdate(schema_.column(kStringColumn),
                           schema_.column_id(kStringColumn), &s);

    ASSERT_OK_FAST(dms_->Update(op.timestamp(), i, RowChangeList(update_buf), op_id_));
    op.FinishApplying();
  }

  ASSERT_EQ(1000, dms_->Count());

  // Read back the values and check correctness.
  MvccSnapshot snap(mvcc_);
  ScopedColumnBlock<UINT32> read_back(1000);
  ScopedColumnBlock<STRING> read_back_slices(1000);
  ApplyUpdates(snap, 0, kIntColumn, &read_back);
  ApplyUpdates(snap, 0, kStringColumn, &read_back_slices);

  // When reading back the slice, do so into a different buffer -
  // otherwise if the slice references weren't properly copied above,
  // we'd be writing our comparison value into the same buffer that
  // we're comparing against!
  char buf2[256];
  for (uint32_t i = 0; i < 1000; i++) {
    ASSERT_EQ(i * 10, read_back[i]) << "failed at iteration " << i;
    snprintf(buf2, sizeof(buf2), "hello %d", i);
    Slice s(buf2);
    ASSERT_EQ(s, read_back_slices[i]);
  }


  // Update the same rows again, with new ops. Even though
  // the same rows are updated, new entries should be added because
  // these are separate ops and we need to maintain the
  // old ones for snapshot consistency purposes.
  for (uint32_t i = 0; i < 1000; i++) {
    ScopedOp op(&mvcc_, clock_.Now());
    op.StartApplying();
    update.Reset();

    uint32_t val = i * 20;
    update.AddColumnUpdate(schema_.column(kIntColumn),
                           schema_.column_id(kIntColumn), &val);
    ASSERT_OK_FAST(dms_->Update(op.timestamp(), i, RowChangeList(update_buf), op_id_));
    op.FinishApplying();
  }

  ASSERT_EQ(2000, dms_->Count());
}

TEST_F(TestDeltaMemStore, TestIteratorDoesUpdates) {
  unordered_set<uint32_t> to_update;
  for (uint32_t i = 0; i < 1000; i++) {
    to_update.insert(i);
  }
  UpdateIntsAtIndexes(to_update);
  ASSERT_EQ(1000, dms_->Count());

  ScopedColumnBlock<UINT32> block(100);

  RowIteratorOptions opts;
  opts.projection = &schema_;
  // TODO(todd): test snapshot reads from different points
  opts.snap_to_include = MvccSnapshot(mvcc_);

  unique_ptr<DeltaIterator> iter;
  Status s = dms_->NewDeltaIterator(opts, &iter);
  if (s.IsNotFound()) {
    FAIL() << "Iterator fell outside of the range of the snapshot";
  }
  ASSERT_OK(s);
  ASSERT_OK(iter->Init(nullptr));

  int block_start_row = 50;
  ASSERT_OK(iter->SeekToOrdinal(block_start_row));
  ASSERT_OK(iter->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
  SelectionVector sv(block.nrows());
  sv.SetAllTrue();
  ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block, sv));

  for (int i = 0; i < 100; i++) {
    int actual_row = block_start_row + i;
    ASSERT_EQ(actual_row * 10, block[i]) << "at row " << actual_row;
  }

  // Apply the next block
  block_start_row += block.nrows();
  ASSERT_OK(iter->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
  ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block, sv));
  for (int i = 0; i < 100; i++) {
    int actual_row = block_start_row + i;
    ASSERT_EQ(actual_row * 10, block[i]) << "at row " << actual_row;
  }
}

TEST_F(TestDeltaMemStore, TestCollectMutations) {
  Arena arena(1024);

  // Update rows 5 and 12
  vector<uint32_t> to_update;
  to_update.push_back(5);
  to_update.push_back(12);
  UpdateIntsAtIndexes(to_update);

  ASSERT_EQ(2, dms_->Count());

  const int kBatchSize = 10;
  vector<Mutation *> mutations;
  mutations.resize(kBatchSize);

  RowIteratorOptions opts;
  opts.projection = &schema_;
  opts.snap_to_include = MvccSnapshot(mvcc_);
  unique_ptr<DeltaIterator> iter;
  Status s = dms_->NewDeltaIterator(opts, &iter);
  if (s.IsNotFound()) {
    FAIL() << "Iterator fell outside of the range of the snapshot";
  }
  ASSERT_OK(s);
  ASSERT_OK(iter->Init(nullptr));
  ASSERT_OK(iter->SeekToOrdinal(0));
  ASSERT_OK(iter->PrepareBatch(kBatchSize, DeltaIterator::PREPARE_FOR_COLLECT));
  ASSERT_OK(iter->CollectMutations(&mutations, &arena));

  // Only row 5 is updated, everything else should be NULL.
  for (int i = 0; i < kBatchSize; i++) {
    string str = Mutation::StringifyMutationList(schema_, mutations[i]);
    VLOG(1) << "row " << i << ": " << str;
    if (i != 5) {
      EXPECT_EQ("[]", str);
    } else {
      EXPECT_EQ("[@1(SET col3=50)]", str);
    }
  }

  // Collect the next batch of 10.
  arena.Reset();
  std::fill(mutations.begin(), mutations.end(), reinterpret_cast<Mutation *>(NULL));
  ASSERT_OK(iter->PrepareBatch(kBatchSize, DeltaIterator::PREPARE_FOR_COLLECT));
  ASSERT_OK(iter->CollectMutations(&mutations, &arena));

  // Only row 2 is updated, everything else should be NULL.
  for (int i = 0; i < 10; i++) {
    string str = Mutation::StringifyMutationList(schema_, mutations[i]);
    VLOG(1) << "row " << i << ": " << str;
    if (i != 2) {
      EXPECT_EQ("[]", str);
    } else {
      EXPECT_EQ("[@2(SET col3=120)]", str);
    }
  }
}

// Generates a series of random deltas, writes them to a DMS, reads them back
// using a DMSIterator, and verifies the results.
TEST_F(TestDeltaMemStore, TestFuzz) {
  // Arbitrary constants to control the running time and coverage of the test.
  const int kNumColumns = 100;
  const int kNumRows = 1000;
  const int kNumDeltas = 10000;
  const std::pair<uint64_t, uint64_t> kTimestampRange(0, 100);

  // Build a schema with kNumColumns columns, some of which are nullable.
  Random prng(SeedRandom());
  SchemaBuilder sb;
  for (int i = 0; i < kNumColumns; i++) {
    if (prng.Uniform(10) == 0) {
      ASSERT_OK(sb.AddNullableColumn(Substitute("col$0", i), UINT32));
    } else {
      ASSERT_OK(sb.AddColumn(Substitute("col$0", i), UINT32));
    }
  }
  Schema schema(sb.Build());

  MirroredDeltas<DeltaTypeSelector<REDO>> deltas(&schema);

  shared_ptr<DeltaMemStore> dms;
  ASSERT_OK(CreateRandomDMS(
      schema, &prng, kNumDeltas, { 0, kNumRows }, kTimestampRange, &deltas, &dms));

  NO_FATALS(RunDeltaFuzzTest<DeltaTypeSelector<REDO>>(
      *dms, &prng, &deltas, kTimestampRange,
      /*test_filter_column_ids_and_collect_deltas=*/false));
}

TEST_F(TestDeltaMemStore, TestDeletedRowCount) {
  const int kNumUpdates = 10000;

  faststring buf;
  RowChangeListEncoder update(&buf);
  for (rowid_t row_idx = 0; row_idx < kNumUpdates; row_idx++) {
    // UPDATE.
    uint32_t new_val = row_idx;
    update.Reset();
    update.AddColumnUpdate(schema_.column(kIntColumn), schema_.column_id(kIntColumn), &new_val);
    ASSERT_OK(dms_->Update(Timestamp(row_idx), row_idx, RowChangeList(buf), op_id_));

    // DELETE.
    if (row_idx % 2 == 0) {
      update.Reset();
      update.SetToDelete();
      ASSERT_OK(dms_->Update(Timestamp(row_idx + 1), row_idx, RowChangeList(buf), op_id_));
    }
  }
  ASSERT_EQ(kNumUpdates / 2, dms_->deleted_row_count());
}

} // namespace tablet
} // namespace kudu
