// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet.h"
#include <algorithm>
#include <cstdint>
#include <ctime>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/iterator.h"
#include "kudu/common/key_range.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/mock-rowsets.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/rowset_metadata.h"
#include "kudu/tablet/rowset_tree.h"
#include "kudu/tablet/tablet-test-base.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_metrics.h" // IWYU pragma: keep
#include "kudu/util/faststring.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
DEFINE_int32(testflush_num_inserts, 1000,
"Number of rows inserted in TestFlush");
DEFINE_int32(testiterator_num_inserts, 1000,
"Number of rows inserted in TestRowIterator/TestInsert");
DEFINE_int32(testcompaction_num_rows, 1000,
"Number of rows per rowset in TestCompaction");
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace tablet {
using cfile::ReaderOptions;
using fs::ReadableBlock;
template<class SETUP>
class TestTablet : public TabletTestBase<SETUP> {
typedef SETUP Type;
// Verify that iteration doesn't fail
void CheckCanIterate() {
vector<string> out_rows;
TYPED_TEST_CASE(TestTablet, TabletTestHelperTypes);
TYPED_TEST(TestTablet, TestFlush) {
// Insert 1000 rows into memrowset
uint64_t max_rows = this->ClampRowCount(FLAGS_testflush_num_inserts);
this->InsertTestRows(0, max_rows, 0);
// Pre-flush, there should be no data on disk, and no diskrowsets, so the
// on-disk size of the tablet should be size of the tablet metadata.
ASSERT_EQ(0, this->tablet()->OnDiskDataSize());
ASSERT_GT(this->tablet()->OnDiskSize(), 0);
ASSERT_EQ(this->tablet()->metadata()->on_disk_size(), this->tablet()->OnDiskSize());
// Flush it.
TabletMetadata* tablet_meta = this->tablet()->metadata();
// Post-flush, there should be data on-disk. On-disk size should exceed
// on-disk data size due to per-diskrowset metadata and tablet metadata.
ASSERT_GT(this->tablet()->OnDiskDataSize(), 0);
ASSERT_GT(this->tablet()->OnDiskSize(), this->tablet()->OnDiskDataSize());
// Make sure the files were created as expected.
RowSetMetadata* rowset_meta = tablet_meta->GetRowSetForTests(0);
CHECK(rowset_meta) << "No row set found";
// check that undo deltas are present
vector<BlockId> undo_blocks = rowset_meta->undo_delta_blocks();
ASSERT_EQ(1, undo_blocks.size());
// Read the undo delta, we should get one undo mutation (delete) for each row.
unique_ptr<ReadableBlock> block;
ASSERT_OK(this->fs_manager()->OpenBlock(undo_blocks[0], &block));
shared_ptr<DeltaFileReader> dfr;
ASSERT_OK(DeltaFileReader::Open(std::move(block), UNDO, ReaderOptions(), &dfr));
// Assert there were 'max_rows' deletions in the undo delta (one for each inserted row)
ASSERT_EQ(dfr->delta_stats().delete_count(), max_rows);
// Test that historical data for a row is maintained even after the row
// is flushed from the memrowset.
TYPED_TEST(TestTablet, TestInsertsAndMutationsAreUndoneWithMVCCAfterFlush) {
// Insert 5 rows into the memrowset.
// After the first one, each time we insert a new row we mutate
// the previous one.
// Take snapshots after each operation
vector<MvccSnapshot> snaps;
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
for (int i = 0; i < 5; i++) {
this->InsertTestRows(i, 1, 0);
DVLOG(1) << "Inserted row=" << i << ", row_idx=" << i << ", val=0";
MvccSnapshot ins_snaphsot(*this->tablet()->mvcc_manager());
LOG(INFO) << "After Insert Snapshot: " << ins_snaphsot.ToString();
if (i > 0) {
ASSERT_OK(this->UpdateTestRow(&writer, i - 1, i));
DVLOG(1) << "Mutated row=" << i - 1 << ", row_idx=" << i - 1 << ", val=" << i;
MvccSnapshot mut_snaphsot(*this->tablet()->mvcc_manager());
DVLOG(1) << "After Mutate Snapshot: " << mut_snaphsot.ToString();
// Collect the expected rows from the MRS, where there are no
// undos
vector<vector<string>* > expected_rows;
CollectRowsForSnapshots(this->tablet().get(), this->client_schema_,
snaps, &expected_rows);
// Flush the tablet
// Now verify that with undos we get the same thing.
VerifySnapshotsHaveSameResult(this->tablet().get(), this->client_schema_,
snaps, expected_rows);
// Do some more work and flush/compact
// take a snapshot and mutate the rows so that we have undos and
// redos
for (int i = 0; i < 4; i++) {
ASSERT_OK(this->UpdateTestRow(&writer, i, i + 10));
DVLOG(1) << "Mutated row=" << i << ", row_idx=" << i << ", val=" << i + 10;
MvccSnapshot mut_snaphsot(*this->tablet()->mvcc_manager());
DVLOG(1) << "After Mutate Snapshot: " << mut_snaphsot.ToString();
// also throw a delete in there.
ASSERT_OK(this->DeleteTestRow(&writer, 4));
MvccSnapshot delete_snaphsot(*this->tablet()->mvcc_manager());
DVLOG(1) << "After Delete Snapshot: " << delete_snaphsot.ToString();
// Collect the expected rows now that we have undos and redos
CollectRowsForSnapshots(this->tablet().get(), this->client_schema_,
snaps, &expected_rows);
// now flush and the compact everything
// Now verify that with undos and redos we get the same thing.
VerifySnapshotsHaveSameResult(this->tablet().get(), this->client_schema_,
snaps, expected_rows);
// This tests KUDU-165, a regression where multiple old ghost rows were appearing in
// compaction outputs and sometimes would be selected as the most recent version
// of the row.
// In particular this makes sure that when there is a ghost row in one row set
// and a live one on another the live one is the only one that survives compaction.
TYPED_TEST(TestTablet, TestGhostRowsOnDiskRowSets) {
// Create a few INSERT/DELETE pairs on-disk by writing and flushing.
// Each of the resulting rowsets has a single row which is a "ghost" since its
// redo data has the DELETE.
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
for (int i = 0; i < 3; i++) {
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
this->DeleteTestRow(&writer, 0);
// Create one more rowset on disk which has just an INSERT (ie a non-ghost row).
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
// Compact. This should result in a rowset with just one row in it.
// Should still be able to update, since the row is live.
ASSERT_OK(this->UpdateTestRow(&writer, 0, 1));
// Test that inserting a row which already exists causes an AlreadyPresent
// error
TYPED_TEST(TestTablet, TestInsertDuplicateKey) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
CHECK_OK(this->InsertTestRow(&writer, 12345, 0));
// Insert again, should fail!
Status s = this->InsertTestRow(&writer, 12345, 0);
ASSERT_STR_CONTAINS(s.ToString(), "key already present");
ASSERT_EQ(1, this->TabletCount());
// Flush, and make sure that inserting duplicate still fails
ASSERT_EQ(1, this->TabletCount());
s = this->InsertTestRow(&writer, 12345, 0);
ASSERT_STR_CONTAINS(s.ToString(), "key already present");
ASSERT_EQ(1, this->TabletCount());
// Tests that we are able to handle reinserts properly.
// Namely tests that:
// - We're able to perform multiple reinserts in a MRS, flush them
// and that all versions of the row are still visible.
// - After we've flushed the reinserts above, we can perform a
// new reinsert in a new MRS, flush that MRS and compact the row
// DRS together, all while preserving the full row history.
TYPED_TEST(TestTablet, TestReinserts) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
vector<MvccSnapshot> snaps;
// In the first snap there's no row.
// Insert one row.
ASSERT_OK(this->InsertTestRow(&writer, 1, 0));
// In the second snap the row exists and has value 0.
// Now delete the test row.
ASSERT_OK(this->DeleteTestRow(&writer, 1));
// In the third snap the row doesn't exist.
// Reinsert the row.
ASSERT_OK(this->InsertTestRow(&writer, 1, 1));
// In the fourth snap the row exists again and has value 1.
// .. and delete the row again.
ASSERT_OK(this->DeleteTestRow(&writer, 1));
// In the fifth snap the row has been deleted.
// Now flush the MRS all versions of the tablet should be visible,
// depending on the chosen snapshot.
vector<vector<string>* > expected_rows;
CollectRowsForSnapshots(this->tablet().get(), this->client_schema_,
snaps, &expected_rows);
ASSERT_EQ(expected_rows.size(), 5);
ASSERT_EQ(expected_rows[0]->size(), 0) << "Got the wrong result from snap: "
<< snaps[0].ToString();
ASSERT_EQ(expected_rows[1]->size(), 1) << "Got the wrong result from snap: "
<< snaps[1].ToString();
ASSERT_STR_CONTAINS((*expected_rows[1])[0], "int32 key_idx=1, int32 val=0)");
ASSERT_EQ(expected_rows[2]->size(), 0) << "Got the wrong result from snap: "
<< snaps[2].ToString();
ASSERT_EQ(expected_rows[3]->size(), 1) << "Got the wrong result from snap: "
<< snaps[3].ToString();
ASSERT_STR_CONTAINS((*expected_rows[3])[0], "int32 key_idx=1, int32 val=1)");
ASSERT_EQ(expected_rows[4]->size(), 0) << "Got the wrong result from snap: "
<< snaps[4].ToString();
// Test flushes and compactions dealing with deleted rows.
TYPED_TEST(TestTablet, TestDeleteWithFlushAndCompact) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
// The row is deleted, so we shouldn't see it in the iterator.
vector<string> rows;
ASSERT_EQ(0, rows.size());
// Flush the tablet and make sure the data doesn't re-appear.
ASSERT_EQ(0, rows.size());
// Re-inserting should succeed. This will reinsert into the MemRowSet.
// Set the int column to '1' this time, so we can differentiate the two
// versions of the row.
CHECK_OK(this->InsertTestRow(&writer, 0, 1));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 1, false), rows[0]);
// Flush again, so the DiskRowSet has the row.
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 1, false), rows[0]);
// Delete it again, now that it's in DRS.
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(1L, writer.last_op_result().mutated_stores(0).rs_id());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).dms_id());
ASSERT_EQ(0, rows.size());
// We now have an INSERT in the MemRowSet and the
// deleted row in the DiskRowSet. The new version
// of the row has '2' in the int column.
CHECK_OK(this->InsertTestRow(&writer, 0, 2));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 2, false), rows[0]);
// Flush - now we have the row in two different DRSs.
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 2, false), rows[0]);
// Compaction should succeed even with the duplicate rows.
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 2, false), rows[0]);
// Test flushes dealing with REINSERT mutations in the MemRowSet.
TYPED_TEST(TestTablet, TestFlushWithReinsert) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
// Insert, delete, and re-insert a row in the MRS.
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
CHECK_OK(this->InsertTestRow(&writer, 0, 1));
// Flush the tablet and make sure the data persists.
vector<string> rows;
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 1, false), rows[0]);
// Test flushes dealing with REINSERT mutations if they arrive in the middle
// of a flush.
TYPED_TEST(TestTablet, TestReinsertDuringFlush) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
// Insert/delete/insert/delete in MemRowStore.
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
CHECK_OK(this->InsertTestRow(&writer, 0, 1));
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
// During the snapshot flush, insert/delete/insert some more during the flush.
class MyCommonHooks : public Tablet::FlushCompactCommonHooks {
explicit MyCommonHooks(TestFixture *test) : test_(test) {}
Status PostWriteSnapshot() OVERRIDE {
LocalTabletWriter writer(test_->tablet().get(), &test_->client_schema());
test_->InsertTestRow(&writer, 0, 1);
CHECK_OK(test_->DeleteTestRow(&writer, 0));
CHECK_EQ(1, writer.last_op_result().mutated_stores_size());
CHECK_EQ(1L, writer.last_op_result().mutated_stores(0).mrs_id());
test_->InsertTestRow(&writer, 0, 2);
CHECK_OK(test_->DeleteTestRow(&writer, 0));
CHECK_EQ(1, writer.last_op_result().mutated_stores_size());
CHECK_EQ(1L, writer.last_op_result().mutated_stores(0).mrs_id());
test_->InsertTestRow(&writer, 0, 3);
return Status::OK();
TestFixture *test_;
shared_ptr<Tablet::FlushCompactCommonHooks> common_hooks(
reinterpret_cast<Tablet::FlushCompactCommonHooks *>(new MyCommonHooks(this)));
// Flush the tablet and make sure the data persists.
vector<string> rows;
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 3, false), rows[0]);
// Test iterating over a tablet which contains data
// in the memrowset as well as two rowsets. This simple test
// only puts one row in each with no updates.
TYPED_TEST(TestTablet, TestRowIteratorSimple) {
const int kInRowSet1 = 1;
const int kInRowSet2 = 2;
const int kInMemRowSet = 3;
// Put a row in disk rowset 1 (insert and flush)
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
CHECK_OK(this->InsertTestRow(&writer, kInRowSet1, 0));
// Put a row in disk rowset 2 (insert and flush)
CHECK_OK(this->InsertTestRow(&writer, kInRowSet2, 0));
// Put a row in memrowset
CHECK_OK(this->InsertTestRow(&writer, kInMemRowSet, 0));
// Now iterate the tablet and make sure the rows show up
gscoped_ptr<RowwiseIterator> iter;
ASSERT_OK(this->tablet()->NewRowIterator(this->client_schema_, &iter));
RowBlock block(this->schema_, 100, &this->arena_);
// First call to CopyNextRows should fetch the whole memrowset.
ASSERT_EQ(1, block.nrows()) << "should get only the one row from memrowset";
this->VerifyRow(block.row(0), kInMemRowSet, 0);
// Next, should fetch the older rowset
ASSERT_EQ(1, block.nrows()) << "should get only the one row from rowset 1";
this->VerifyRow(block.row(0), kInRowSet1, 0);
// Next, should fetch the newer rowset
ASSERT_EQ(1, block.nrows()) << "should get only the one row from rowset 2";
this->VerifyRow(block.row(0), kInRowSet2, 0);
// Hook implementation which runs a lambda function during the 'duplicating'
// phase of compaction.
template<class HookFunc>
class RunDuringDuplicatingRowSetPhase : public Tablet::FlushCompactCommonHooks {
explicit RunDuringDuplicatingRowSetPhase(HookFunc hook)
: hook_(std::move(hook)) {}
Status PostSwapInDuplicatingRowSet() override {
return Status::OK();
const HookFunc hook_;
TYPED_TEST(TestTablet, TestRowIteratorOrdered) {
// Create interleaved keys in each rowset, so they are clearly not in order
const int kNumRows = 128;
const int kNumBatches = 4;
LOG(INFO) << "Schema: " << this->schema_.ToString();
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
for (int i = 0; i < kNumBatches; i++) {
for (int j = 0; j < kNumRows; j++) {
if (j % kNumBatches == i) {
LOG(INFO) << "Inserting row " << j;
CHECK_OK(this->InsertTestRow(&writer, 654321+j, j));
// We'll test ordered scans a few times, covering before, during, and
// after a compaction.
auto RunScans = [this, kNumRows]() {
MvccSnapshot snap(*this->tablet()->mvcc_manager());
// Iterate through with a few different block sizes.
for (int numBlocks = 1; numBlocks < 5; numBlocks*=2) {
const int rowsPerBlock = kNumRows / numBlocks;
// Make a new ordered iterator for the current snapshot.
gscoped_ptr<RowwiseIterator> iter;
ASSERT_OK(this->tablet()->NewRowIterator(this->client_schema_, snap, ORDERED, &iter));
// Iterate the tablet collecting rows.
vector<shared_ptr<faststring> > rows;
for (int i = 0; i < numBlocks; i++) {
RowBlock block(this->schema_, rowsPerBlock, &this->arena_);
ASSERT_EQ(rowsPerBlock, block.nrows()) << "unexpected number of rows returned";
for (int j = 0; j < rowsPerBlock; j++) {
RowBlockRow row = block.row(j);
shared_ptr<faststring> encoded(new faststring());
this->client_schema_.EncodeComparableKey(row, encoded.get());
// Verify the collected rows, checking that they are sorted.
for (int j = 1; j < rows.size(); j++) {
// Use the schema for comparison, since this test is run with different schemas.
ASSERT_LT((*rows[j-1]).ToString(), (*rows[j]).ToString());
ASSERT_EQ(kNumRows, rows.size());
SCOPED_TRACE("With no compaction");
SCOPED_TRACE("With duplicating rowset");
shared_ptr<Tablet::FlushCompactCommonHooks> hooks_shared(
new RunDuringDuplicatingRowSetPhase<decltype(RunScans)>(RunScans));
SCOPED_TRACE("After compaction");
template<class SETUP>
bool TestSetupExpectsNulls(int32_t /*key_idx*/) {
return false;
bool TestSetupExpectsNulls<NullableValueTestSetup>(int32_t key_idx) {
// If it's a row that the test updates, then we should expect null
// based on whether it updated to NULL or away from NULL.
bool should_update = (key_idx % 2 == 1);
if (should_update) {
return (key_idx % 10 == 1);
// Otherwise, expect whatever was inserted.
return NullableValueTestSetup::ShouldInsertAsNull(key_idx);
// Test iterating over a tablet which has a memrowset
// and several rowsets, each with many rows of data.
TYPED_TEST(TestTablet, TestRowIteratorComplex) {
uint64_t max_rows = this->ClampRowCount(FLAGS_testiterator_num_inserts);
// Put a row in disk rowset 1 (insert and flush)
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
for (int32_t i = 0; i < max_rows; i++) {
ASSERT_OK_FAST(this->InsertTestRow(&writer, i, 0));
if (i % 300 == 0) {
LOG(INFO) << "Flushing after " << i << " rows inserted";
LOG(INFO) << "Successfully inserted " << max_rows << " rows";
// At this point, we should have several rowsets as well
// as some data in memrowset.
// Update a subset of the rows
for (int32_t i = 0; i < max_rows; i++) {
bool should_update = (i % 2 == 1);
if (!should_update) continue;
bool set_to_null = TestSetupExpectsNulls<TypeParam>(i);
if (set_to_null) {
this->UpdateTestRowToNull(&writer, i);
} else {
ASSERT_OK_FAST(this->UpdateTestRow(&writer, i, i));
// Now iterate the tablet and make sure the rows show up.
gscoped_ptr<RowwiseIterator> iter;
const Schema& schema = this->client_schema_;
ASSERT_OK(this->tablet()->NewRowIterator(schema, &iter));
LOG(INFO) << "Created iter: " << iter->ToString();
vector<bool> seen(max_rows, false);
int seen_count = 0;
RowBlock block(schema, 100, &this->arena_);
while (iter->HasNext()) {
LOG(INFO) << "Fetched batch of " << block.nrows();
for (size_t i = 0; i < block.nrows(); i++) {
// Verify that we see each key exactly once.
int32_t key_idx = *schema.ExtractColumnFromRow<INT32>(block.row(i), 1);
if (seen[key_idx]) {
FAIL() << "Saw row " << key_idx << " multiple times";
seen[key_idx] = true;
// Verify that we see the correctly updated value
const int32_t* val = schema.ExtractColumnFromRow<INT32>(block.row(i), 2);
bool set_to_null = TestSetupExpectsNulls<TypeParam>(key_idx);
bool should_update = (key_idx % 2 == 1);
if (val == nullptr) {
} else if (should_update) {
ASSERT_EQ(key_idx, *val);
} else {
ASSERT_EQ(0, *val);
ASSERT_EQ(seen_count, max_rows)
<< "expected to see all inserted data through iterator.";
// Test that, when a tablet has flushed data and is
// reopened, that the data persists
TYPED_TEST(TestTablet, TestInsertsPersist) {
uint64_t max_rows = this->ClampRowCount(FLAGS_testiterator_num_inserts);
this->InsertTestRows(0, max_rows, 0);
ASSERT_EQ(max_rows, this->TabletCount());
// Get current timestamp.
Timestamp t = this->tablet()->clock()->Now();
// Flush it.
ASSERT_EQ(max_rows, this->TabletCount());
// Close and re-open tablet.
// TODO: Should we be reopening the tablet in a different way to persist the
// clock / timestamps?
// Ensure that rows exist
ASSERT_EQ(max_rows, this->TabletCount());
this->VerifyTestRowsWithTimestampAndVerifier(0, max_rows, t, boost::none);
// TODO: add some more data, re-flush
TYPED_TEST(TestTablet, TestUpsert) {
vector<string> rows;
const auto& upserts_as_updates = this->tablet()->metrics()->upserts_as_updates;
// Upsert a new row.
this->UpsertTestRows(0, 1, 1000);
EXPECT_EQ(0, upserts_as_updates->value());
// Upsert a row that is in the MRS.
this->UpsertTestRows(0, 1, 1001);
EXPECT_EQ(1, upserts_as_updates->value());
EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1001, false) }, rows);
// Flush it.
EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1001, false) }, rows);
// Upsert a row that is in the DRS.
this->UpsertTestRows(0, 1, 1002);
EXPECT_EQ(2, upserts_as_updates->value());
EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1002, false) }, rows);
// Test that when a row has been updated many times, it always yields
// the most recent value.
TYPED_TEST(TestTablet, TestMultipleUpdates) {
// Insert and update several times in MemRowSet
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->UpdateTestRow(&writer, 0, 1));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
ASSERT_OK(this->UpdateTestRow(&writer, 0, 2));
ASSERT_OK(this->UpdateTestRow(&writer, 0, 3));
// Should see most recent value.
vector<string> out_rows;
ASSERT_EQ(1, out_rows.size());
ASSERT_EQ(this->setup_.FormatDebugRow(0, 3, false), out_rows[0]);
// Flush it.
// Should still see most recent value.
ASSERT_EQ(1, out_rows.size());
ASSERT_EQ(this->setup_.FormatDebugRow(0, 3, false), out_rows[0]);
// Update the row a few times in DeltaMemStore
ASSERT_OK(this->UpdateTestRow(&writer, 0, 4));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).rs_id());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).dms_id());
ASSERT_OK(this->UpdateTestRow(&writer, 0, 5));
ASSERT_OK(this->UpdateTestRow(&writer, 0, 6));
// Should still see most recent value.
ASSERT_EQ(1, out_rows.size());
ASSERT_EQ(this->setup_.FormatDebugRow(0, 6, false), out_rows[0]);
// Force a compaction after adding a new rowset with one row.
CHECK_OK(this->InsertTestRow(&writer, 1, 0));
ASSERT_EQ(2, this->tablet()->num_rowsets());
ASSERT_EQ(1, this->tablet()->num_rowsets());
// Should still see most recent value.
ASSERT_EQ(2, out_rows.size());
ASSERT_EQ(this->setup_.FormatDebugRow(0, 6, false), out_rows[0]);
ASSERT_EQ(this->setup_.FormatDebugRow(1, 0, false), out_rows[1]);
TYPED_TEST(TestTablet, TestCompaction) {
uint64_t max_rows = this->ClampRowCount(FLAGS_testcompaction_num_rows);
uint64_t n_rows = max_rows / 3;
// Create three rowsets by inserting and flushing
LOG_TIMING(INFO, "Inserting rows") {
this->InsertTestRows(0, n_rows, 0);
LOG_TIMING(INFO, "Flushing rows") {
// first MemRowSet had id 0, current one should be 1
ASSERT_EQ(1, this->tablet()->CurrentMrsIdForTests());
LOG_TIMING(INFO, "Inserting rows") {
this->InsertTestRows(n_rows, n_rows, 0);
LOG_TIMING(INFO, "Flushing rows") {
// previous MemRowSet had id 1, current one should be 2
ASSERT_EQ(2, this->tablet()->CurrentMrsIdForTests());
LOG_TIMING(INFO, "Inserting rows") {
this->InsertTestRows(n_rows * 2, n_rows, 0);
LOG_TIMING(INFO, "Flushing rows") {
// previous MemRowSet had id 2, current one should be 3
ASSERT_EQ(3, this->tablet()->CurrentMrsIdForTests());
// Issue compaction
LOG_TIMING(INFO, "Compacting rows") {
// Compaction does not swap the memrowsets so we should still get 3
ASSERT_EQ(3, this->tablet()->CurrentMrsIdForTests());
ASSERT_EQ(n_rows * 3, this->TabletCount());
const RowSetMetadata *rowset_meta = this->tablet()->metadata()->GetRowSetForTests(3);
ASSERT_TRUE(rowset_meta != nullptr);
// Old rowsets should not exist anymore
for (int i = 0; i <= 2; i++) {
const RowSetMetadata *rowset_meta = this->tablet()->metadata()->GetRowSetForTests(i);
ASSERT_TRUE(rowset_meta == nullptr);
enum MutationType {
// Hook used by the Test*WithConcurrentMutation tests.
// Every time one of these hooks triggers, it inserts a row starting
// at row 20 (and increasing), and updates a row starting at row 10
// (and increasing).
template<class TestFixture>
class MyCommonHooks : public Tablet::FlushCompactCommonHooks {
explicit MyCommonHooks(TestFixture *test, bool flushed)
: test_(test),
i_(0) {
Status DoHook(MutationType expected_mutation_type) {
LocalTabletWriter writer(test_->tablet().get(), &test_->client_schema());
RETURN_NOT_OK(test_->DeleteTestRow(&writer, i_));
switch (expected_mutation_type) {
CHECK_EQ(1, writer.last_op_result().mutated_stores_size());
CHECK_EQ(1, writer.last_op_result().mutated_stores_size());
CHECK_EQ(2, writer.last_op_result().mutated_stores_size());
RETURN_NOT_OK(test_->UpdateTestRow(&writer, 10 + i_, 1000 + i_));
test_->InsertTestRows(20 + i_, 1, 0);
return Status::OK();
virtual Status PostTakeMvccSnapshot() OVERRIDE {
// before we flush we update the MemRowSet afterwards we update the
// DeltaMemStore
if (!flushed_) {
return DoHook(MRS_MUTATION);
} else {
return DoHook(DELTA_MUTATION);
virtual Status PostWriteSnapshot() OVERRIDE {
if (!flushed_) {
return DoHook(MRS_MUTATION);
} else {
return DoHook(DELTA_MUTATION);
virtual Status PostSwapInDuplicatingRowSet() OVERRIDE {
virtual Status PostReupdateMissedDeltas() OVERRIDE {
virtual Status PostSwapNewRowSet() OVERRIDE {
return DoHook(DELTA_MUTATION);
TestFixture *test_;
bool flushed_;
int i_;
template<class TestFixture>
class MyFlushHooks : public Tablet::FlushFaultHooks, public MyCommonHooks<TestFixture> {
explicit MyFlushHooks(TestFixture *test, bool flushed) :
MyCommonHooks<TestFixture>(test, flushed) {}
virtual Status PostSwapNewMemRowSet() { return this->DoHook(MRS_MUTATION); }
template<class TestFixture>
class MyCompactHooks : public Tablet::CompactionFaultHooks, public MyCommonHooks<TestFixture> {
explicit MyCompactHooks(TestFixture *test, bool flushed) :
MyCommonHooks<TestFixture>(test, flushed) {}
Status PostSelectIterators() { return this->DoHook(DELTA_MUTATION); }
// Test for Flush with concurrent update, delete and insert during the
// various phases.
TYPED_TEST(TestTablet, TestFlushWithConcurrentMutation) {
this->InsertTestRows(0, 7, 0); // 0-6 inclusive: these rows will be deleted
this->InsertTestRows(10, 7, 0); // 10-16 inclusive: these rows will be updated
// Rows 20-26 inclusive will be inserted during the flush
// Inject hooks which mutate those rows and add more rows at
// each key stage of flushing.
shared_ptr<MyFlushHooks<TestFixture> > hooks(new MyFlushHooks<TestFixture>(this, false));
// First hook before we do the Flush
// Then do the flush with the hooks enabled.
// Now verify that the results saw all the mutated_stores.
vector<string> out_rows;
std::sort(out_rows.begin(), out_rows.end());
vector<string> expected_rows;
expected_rows.push_back(this->setup_.FormatDebugRow(10, 1000, true));
expected_rows.push_back(this->setup_.FormatDebugRow(11, 1001, true));
expected_rows.push_back(this->setup_.FormatDebugRow(12, 1002, true));
expected_rows.push_back(this->setup_.FormatDebugRow(13, 1003, true));
expected_rows.push_back(this->setup_.FormatDebugRow(14, 1004, true));
expected_rows.push_back(this->setup_.FormatDebugRow(15, 1005, true));
expected_rows.push_back(this->setup_.FormatDebugRow(16, 1006, true));
expected_rows.push_back(this->setup_.FormatDebugRow(20, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(21, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(22, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(23, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(24, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(25, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(26, 0, false));
std::sort(expected_rows.begin(), expected_rows.end());
// Verify that all the inserts and updates arrived and persisted.
LOG(INFO) << "Expected: " << JoinStrings(expected_rows, "\n");
// Verify that all the inserts and updates arrived and persisted.
LOG(INFO) << "Results: " << JoinStrings(out_rows, "\n");
ASSERT_EQ(expected_rows.size(), out_rows.size());
vector<string>::const_iterator exp_it = expected_rows.begin();
for (vector<string>::const_iterator out_it = out_rows.begin(); out_it!= out_rows.end();) {
ASSERT_EQ(*out_it, *exp_it);
// Test for compaction with concurrent update and insert during the
// various phases.
TYPED_TEST(TestTablet, TestCompactionWithConcurrentMutation) {
// Create three rowsets by inserting and flushing.
// The rows from these layers will get updated or deleted during the flush:
// - rows 0-6 inclusive will be deleted
// - rows 10-16 inclusive will be updated
this->InsertTestRows(0, 2, 0); // rows 0-1
this->InsertTestRows(10, 2, 0); // rows 10-11
this->InsertTestRows(2, 2, 0); // rows 2-3
this->InsertTestRows(12, 2, 0); // rows 12-13
this->InsertTestRows(4, 3, 0); // rows 4-6
this->InsertTestRows(14, 3, 0); // rows 14-16
// Rows 20-26 inclusive will be inserted during the flush.
shared_ptr<MyCompactHooks<TestFixture> > hooks(new MyCompactHooks<TestFixture>(this, true));
// First hook pre-compaction.
// Issue compaction
// Grab the resulting data into a vector.
vector<string> out_rows;
std::sort(out_rows.begin(), out_rows.end());
vector<string> expected_rows;
expected_rows.push_back(this->setup_.FormatDebugRow(10, 1000, true));
expected_rows.push_back(this->setup_.FormatDebugRow(11, 1001, true));
expected_rows.push_back(this->setup_.FormatDebugRow(12, 1002, true));
expected_rows.push_back(this->setup_.FormatDebugRow(13, 1003, true));
expected_rows.push_back(this->setup_.FormatDebugRow(14, 1004, true));
expected_rows.push_back(this->setup_.FormatDebugRow(15, 1005, true));
expected_rows.push_back(this->setup_.FormatDebugRow(16, 1006, true));
expected_rows.push_back(this->setup_.FormatDebugRow(20, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(21, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(22, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(23, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(24, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(25, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(26, 0, false));
std::sort(expected_rows.begin(), expected_rows.end());
ASSERT_EQ(expected_rows.size(), out_rows.size());
// Verify that all the inserts and updates arrived and persisted.
LOG(INFO) << "Expected: " << JoinStrings(expected_rows, "\n");
// Verify that all the inserts and updates arrived and persisted.
LOG(INFO) << "Results: " << JoinStrings(out_rows, "\n");
vector<string>::const_iterator exp_it = expected_rows.begin();
for (vector<string>::const_iterator out_it = out_rows.begin(); out_it!= out_rows.end();) {
ASSERT_EQ(*out_it, *exp_it);
// Test that metrics behave properly during tablet initialization
TYPED_TEST(TestTablet, TestMetricsInit) {
// Create a tablet, but do not open it
MetricRegistry* registry = this->harness()->metrics_registry();
std::ostringstream out;
JsonWriter writer(&out, JsonWriter::PRETTY);
ASSERT_OK(registry->WriteAsJson(&writer, { "*" }, MetricJsonOptions()));
// Open tablet, should still work
ASSERT_OK(registry->WriteAsJson(&writer, { "*" }, MetricJsonOptions()));
// Test that we find the correct log segment size for different indexes.
TEST(TestTablet, TestGetReplaySizeForIndex) {
std::map<int64_t, int64_t> replay_size_map;
// We build a map that represents 3 logs.
// See Log::GetReplaySizeMap(...) for details.
replay_size_map[100] = 45;
replay_size_map[200] = 25;
replay_size_map[300] = 10;
// -1 indicates that no logs are anchored, and thus we it should report
// no logs need to be replayed.
int64_t min_log_index = -1;
EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 0);
// A value in or before the first segment retains all the logs.
min_log_index = 1;
EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 45);
// A value at the end of the first segment also retains everything.
min_log_index = 100;
EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 45);
// Beginning of second segment, only retain that one and the next.
min_log_index = 101;
EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 25);
// Beginning of third segment, only retain that one.
min_log_index = 201;
EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 10);
// A value after all the passed segments, doesn't retain anything.
min_log_index = 301;
EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 0);
class TestTabletStringKey : public TestTablet<StringKeyTestSetup> {
void AssertChunks(vector<KeyRange> expected, vector<KeyRange> actual) {
ASSERT_EQ(actual.size(), expected.size());
for (size_t idx = 0; idx < actual.size(); ++idx) {
ASSERT_EQ(actual[idx].size_bytes(), expected[idx].size_bytes());
// Test for split key range
TEST_F(TestTabletStringKey, TestSplitKeyRange) {
Tablet* tablet = this->mutable_tablet();
scoped_refptr<TabletComponents> comps;
RowSetVector old_rowset = comps->rowsets->all_rowsets();
RowSetVector new_rowset = {
std::make_shared<MockDiskRowSet>("0", "9", 9000, 90),
std::make_shared<MockDiskRowSet>("2", "5", 3000, 30),
std::make_shared<MockDiskRowSet>("5", "6", 1000, 10)
tablet->AtomicSwapRowSets(old_rowset, new_rowset);
std::vector<KeyRange> result = {
KeyRange("", "2", 2000),
KeyRange("2", "5", 6000),
KeyRange("5", "6", 2000),
KeyRange("6", "", 3000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range);
AssertChunks(result, range);
// target chunk size less than the min interval size
std::vector<KeyRange> result = {
KeyRange("", "2", 2000),
KeyRange("2", "5", 6000),
KeyRange("5", "6", 2000),
KeyRange("6", "", 3000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 900, &range);
AssertChunks(result, range);
// target chunk size greater than the max interval size
std::vector<KeyRange> result = {
KeyRange("", "", 13000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 20000, &range);
AssertChunks(result, range);
// test split key range with column
std::vector<KeyRange> result = {
KeyRange("", "2", 40),
KeyRange("2", "5", 120),
KeyRange("5", "6", 40),
KeyRange("6", "", 60)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 60, &range);
AssertChunks(result, range);
// test split key range with bound
gscoped_ptr<EncodedKey> l_enc_key;
gscoped_ptr<EncodedKey> u_enc_key;
Arena arena(256);
KuduPartialRow lower_bound(&this->schema_);
CHECK_OK(lower_bound.SetString("key", "1"));
CHECK_OK(lower_bound.SetInt32("key_idx", 0));
CHECK_OK(lower_bound.SetInt32("val", 0));
string l_encoded;
ASSERT_OK(EncodedKey::DecodeEncodedString(this->schema_, &arena, l_encoded, &l_enc_key));
KuduPartialRow upper_bound(&this->schema_);
CHECK_OK(upper_bound.SetString("key", "4"));
CHECK_OK(upper_bound.SetInt32("key_idx", 0));
CHECK_OK(upper_bound.SetInt32("val", 0));
string u_encoded;
ASSERT_OK(EncodedKey::DecodeEncodedString(this->schema_, &arena, u_encoded, &u_enc_key));
// split key range in [1, 4)
std::vector<KeyRange> result = {
KeyRange("1", "2", 1000),
KeyRange("2", "4", 4000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(l_enc_key.get(), u_enc_key.get(), col_ids, 2000, &range);
AssertChunks(result, range);
// split key range in [min, 4)
std::vector<KeyRange> result = {
KeyRange("", "2", 2000),
KeyRange("2", "4", 4000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, u_enc_key.get(), col_ids, 2000, &range);
AssertChunks(result, range);
// split key range in [4, max)
std::vector<KeyRange> result = {
KeyRange("4", "5", 2000),
KeyRange("5", "6", 2000),
KeyRange("6", "", 3000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(u_enc_key.get(), nullptr, col_ids, 2000, &range);
AssertChunks(result, range);
// Test for split key range, tablet with 0 rowsets
TEST_F(TestTabletStringKey, TestSplitKeyRangeWithZeroRowSets) {
Tablet* tablet = this->mutable_tablet();
scoped_refptr<TabletComponents> comps;
RowSetVector old_rowset = comps->rowsets->all_rowsets();
RowSetVector new_rowset = {};
tablet->AtomicSwapRowSets(old_rowset, new_rowset);
std::vector<KeyRange> result = {
KeyRange("", "", 0)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range);
AssertChunks(result, range);
// Test for split key range, tablet with 1 rowset
TEST_F(TestTabletStringKey, TestSplitKeyRangeWithOneRowSet) {
Tablet* tablet = this->mutable_tablet();
scoped_refptr<TabletComponents> comps;
RowSetVector old_rowset = comps->rowsets->all_rowsets();
RowSetVector new_rowset = {
std::make_shared<MockDiskRowSet>("0", "9", 9000, 90)
tablet->AtomicSwapRowSets(old_rowset, new_rowset);
std::vector<KeyRange> result = {
KeyRange("", "", 9000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range);
AssertChunks(result, range);
// Test for split key range, tablet with non-overlapping rowsets
TEST_F(TestTabletStringKey, TestSplitKeyRangeWithNonOverlappingRowSets) {
Tablet *tablet = this->mutable_tablet();
// Rowsets without gaps
scoped_refptr<TabletComponents> comps;
RowSetVector old_rowset = comps->rowsets->all_rowsets();
RowSetVector without_gaps_rowset = {
std::make_shared<MockDiskRowSet>("0", "2", 2000, 20),
std::make_shared<MockDiskRowSet>("2", "5", 3000, 30),
std::make_shared<MockDiskRowSet>("5", "6", 1000, 10),
std::make_shared<MockDiskRowSet>("6", "9", 3000, 30)
tablet->AtomicSwapRowSets(old_rowset, without_gaps_rowset);
std::vector<KeyRange> result = {
KeyRange("", "2", 2000),
KeyRange("2", "5", 3000),
KeyRange("5", "", 4000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range);
AssertChunks(result, range);
// Rowsets with gaps
old_rowset = comps->rowsets->all_rowsets();
RowSetVector with_gaps_rowset = {
std::make_shared<MockDiskRowSet>("0", "2", 2000, 20),
std::make_shared<MockDiskRowSet>("5", "6", 1000, 10),
std::make_shared<MockDiskRowSet>("6", "9", 3000, 30)
tablet->AtomicSwapRowSets(old_rowset, with_gaps_rowset);
std::vector<KeyRange> result = {
KeyRange("", "6", 3000),
KeyRange("6", "", 3000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range);
AssertChunks(result, range);
// Test for split key range, tablet with rowset whose start is the minimum value
TEST_F(TestTabletStringKey, TestSplitKeyRangeWithMinimumValueRowSet) {
Tablet *tablet = this->mutable_tablet();
// Rowsets without gaps
scoped_refptr<TabletComponents> comps;
RowSetVector old_rowset = comps->rowsets->all_rowsets();
RowSetVector without_gaps_rowset = {
std::make_shared<MockDiskRowSet>("", "2", 2500, 20),
std::make_shared<MockDiskRowSet>("2", "5", 3000, 30),
std::make_shared<MockDiskRowSet>("5", "6", 1000, 10),
std::make_shared<MockDiskRowSet>("6", "9", 3000, 30)
tablet->AtomicSwapRowSets(old_rowset, without_gaps_rowset);
std::vector<KeyRange> result = {
KeyRange("", "2", 2500),
KeyRange("2", "5", 3000),
KeyRange("5", "", 4000)
std::vector<ColumnId> col_ids;
std::vector<KeyRange> range;
tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range);
AssertChunks(result, range);
} // namespace tablet
} // namespace kudu