blob: c2f13ca029860ce25c8f81201392c6bc392fc836 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <glog/logging.h>
#include <time.h>
#include "kudu/common/iterator.h"
#include "kudu/common/row.h"
#include "kudu/common/scan_spec.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet-test-base.h"
#include "kudu/util/slice.h"
#include "kudu/util/test_macros.h"
using std::shared_ptr;
using std::unordered_set;
namespace kudu {
namespace tablet {
using fs::ReadableBlock;
DEFINE_int32(testflush_num_inserts, 1000,
"Number of rows inserted in TestFlush");
DEFINE_int32(testiterator_num_inserts, 1000,
"Number of rows inserted in TestRowIterator/TestInsert");
DEFINE_int32(testcompaction_num_rows, 1000,
"Number of rows per rowset in TestCompaction");
template<class SETUP>
class TestTablet : public TabletTestBase<SETUP> {
typedef SETUP Type;
public:
// Verify that iteration doesn't fail
void CheckCanIterate() {
vector<string> out_rows;
ASSERT_OK(this->IterateToStringList(&out_rows));
}
};
TYPED_TEST_CASE(TestTablet, TabletTestHelperTypes);
TYPED_TEST(TestTablet, TestFlush) {
// Insert 1000 rows into memrowset
uint64_t max_rows = this->ClampRowCount(FLAGS_testflush_num_inserts);
this->InsertTestRows(0, max_rows, 0);
// Flush it.
ASSERT_OK(this->tablet()->Flush());
TabletMetadata* tablet_meta = this->tablet()->metadata();
// Make sure the files were created as expected.
RowSetMetadata* rowset_meta = tablet_meta->GetRowSetForTests(0);
CHECK(rowset_meta) << "No row set found";
ASSERT_TRUE(rowset_meta->HasDataForColumnIdForTests(this->schema_.column_id(0)));
ASSERT_TRUE(rowset_meta->HasDataForColumnIdForTests(this->schema_.column_id(1)));
ASSERT_TRUE(rowset_meta->HasDataForColumnIdForTests(this->schema_.column_id(2)));
ASSERT_TRUE(rowset_meta->HasBloomDataBlockForTests());
// check that undo deltas are present
vector<BlockId> undo_blocks = rowset_meta->undo_delta_blocks();
ASSERT_EQ(1, undo_blocks.size());
// Read the undo delta, we should get one undo mutation (delete) for each row.
gscoped_ptr<ReadableBlock> block;
ASSERT_OK(this->fs_manager()->OpenBlock(undo_blocks[0], &block));
shared_ptr<DeltaFileReader> dfr;
ASSERT_OK(DeltaFileReader::Open(std::move(block), undo_blocks[0], &dfr, UNDO));
// Assert there were 'max_rows' deletions in the undo delta (one for each inserted row)
ASSERT_EQ(dfr->delta_stats().delete_count(), max_rows);
}
// Test that historical data for a row is maintained even after the row
// is flushed from the memrowset.
TYPED_TEST(TestTablet, TestInsertsAndMutationsAreUndoneWithMVCCAfterFlush) {
// Insert 5 rows into the memrowset.
// After the first one, each time we insert a new row we mutate
// the previous one.
// Take snapshots after each operation
vector<MvccSnapshot> snaps;
snaps.push_back(MvccSnapshot(*this->tablet()->mvcc_manager()));
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
for (int i = 0; i < 5; i++) {
this->InsertTestRows(i, 1, 0);
DVLOG(1) << "Inserted row=" << i << ", row_idx=" << i << ", val=0";
MvccSnapshot ins_snaphsot(*this->tablet()->mvcc_manager());
snaps.push_back(ins_snaphsot);
LOG(INFO) << "After Insert Snapshot: " << ins_snaphsot.ToString();
if (i > 0) {
ASSERT_OK(this->UpdateTestRow(&writer, i - 1, i));
DVLOG(1) << "Mutated row=" << i - 1 << ", row_idx=" << i - 1 << ", val=" << i;
MvccSnapshot mut_snaphsot(*this->tablet()->mvcc_manager());
snaps.push_back(mut_snaphsot);
DVLOG(1) << "After Mutate Snapshot: " << mut_snaphsot.ToString();
}
}
// Collect the expected rows from the MRS, where there are no
// undos
vector<vector<string>* > expected_rows;
CollectRowsForSnapshots(this->tablet().get(), this->client_schema_,
snaps, &expected_rows);
// Flush the tablet
ASSERT_OK(this->tablet()->Flush());
// Now verify that with undos we get the same thing.
VerifySnapshotsHaveSameResult(this->tablet().get(), this->client_schema_,
snaps, expected_rows);
// Do some more work and flush/compact
// take a snapshot and mutate the rows so that we have undos and
// redos
snaps.push_back(MvccSnapshot(*this->tablet()->mvcc_manager()));
//
for (int i = 0; i < 4; i++) {
ASSERT_OK(this->UpdateTestRow(&writer, i, i + 10));
DVLOG(1) << "Mutated row=" << i << ", row_idx=" << i << ", val=" << i + 10;
MvccSnapshot mut_snaphsot(*this->tablet()->mvcc_manager());
snaps.push_back(mut_snaphsot);
DVLOG(1) << "After Mutate Snapshot: " << mut_snaphsot.ToString();
}
// also throw a delete in there.
ASSERT_OK(this->DeleteTestRow(&writer, 4));
MvccSnapshot delete_snaphsot(*this->tablet()->mvcc_manager());
snaps.push_back(delete_snaphsot);
DVLOG(1) << "After Delete Snapshot: " << delete_snaphsot.ToString();
// Collect the expected rows now that we have undos and redos
STLDeleteElements(&expected_rows);
CollectRowsForSnapshots(this->tablet().get(), this->client_schema_,
snaps, &expected_rows);
// now flush and the compact everything
ASSERT_OK(this->tablet()->Flush());
ASSERT_OK(this->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
// Now verify that with undos and redos we get the same thing.
VerifySnapshotsHaveSameResult(this->tablet().get(), this->client_schema_,
snaps, expected_rows);
STLDeleteElements(&expected_rows);
}
// This tests KUDU-165, a regression where multiple old ghost rows were appearing in
// compaction outputs and sometimes would be selected as the most recent version
// of the row.
// In particular this makes sure that when there is a ghost row in one row set
// and a live one on another the live one is the only one that survives compaction.
TYPED_TEST(TestTablet, TestGhostRowsOnDiskRowSets) {
// Create a few INSERT/DELETE pairs on-disk by writing and flushing.
// Each of the resulting rowsets has a single row which is a "ghost" since its
// redo data has the DELETE.
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
for (int i = 0; i < 3; i++) {
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
this->DeleteTestRow(&writer, 0);
ASSERT_OK(this->tablet()->Flush());
}
// Create one more rowset on disk which has just an INSERT (ie a non-ghost row).
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->tablet()->Flush());
// Compact. This should result in a rowset with just one row in it.
ASSERT_OK(this->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
// Should still be able to update, since the row is live.
ASSERT_OK(this->UpdateTestRow(&writer, 0, 1));
}
// Test that inserting a row which already exists causes an AlreadyPresent
// error
TYPED_TEST(TestTablet, TestInsertDuplicateKey) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
CHECK_OK(this->InsertTestRow(&writer, 12345, 0));
ASSERT_FALSE(writer.last_op_result().has_failed_status());
// Insert again, should fail!
Status s = this->InsertTestRow(&writer, 12345, 0);
ASSERT_STR_CONTAINS(s.ToString(), "entry already present in memrowset");
ASSERT_EQ(1, this->TabletCount());
// Flush, and make sure that inserting duplicate still fails
ASSERT_OK(this->tablet()->Flush());
ASSERT_EQ(1, this->TabletCount());
s = this->InsertTestRow(&writer, 12345, 0);
ASSERT_STR_CONTAINS(s.ToString(), "key already present");
ASSERT_EQ(1, this->TabletCount());
}
// Test flushes and compactions dealing with deleted rows.
TYPED_TEST(TestTablet, TestDeleteWithFlushAndCompact) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
// The row is deleted, so we shouldn't see it in the iterator.
vector<string> rows;
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(0, rows.size());
// Flush the tablet and make sure the data doesn't re-appear.
ASSERT_OK(this->tablet()->Flush());
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(0, rows.size());
// Re-inserting should succeed. This will reinsert into the MemRowSet.
// Set the int column to '1' this time, so we can differentiate the two
// versions of the row.
CHECK_OK(this->InsertTestRow(&writer, 0, 1));
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 1, false), rows[0]);
// Flush again, so the DiskRowSet has the row.
ASSERT_OK(this->tablet()->Flush());
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 1, false), rows[0]);
// Delete it again, now that it's in DRS.
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(1L, writer.last_op_result().mutated_stores(0).rs_id());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).dms_id());
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(0, rows.size());
// We now have an INSERT in the MemRowSet and the
// deleted row in the DiskRowSet. The new version
// of the row has '2' in the int column.
CHECK_OK(this->InsertTestRow(&writer, 0, 2));
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 2, false), rows[0]);
// Flush - now we have the row in two different DRSs.
ASSERT_OK(this->tablet()->Flush());
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 2, false), rows[0]);
// Compaction should succeed even with the duplicate rows.
ASSERT_OK(this->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 2, false), rows[0]);
}
// Test flushes dealing with REINSERT mutations in the MemRowSet.
TYPED_TEST(TestTablet, TestFlushWithReinsert) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
// Insert, delete, and re-insert a row in the MRS.
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
CHECK_OK(this->InsertTestRow(&writer, 0, 1));
// Flush the tablet and make sure the data persists.
ASSERT_OK(this->tablet()->Flush());
vector<string> rows;
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 1, false), rows[0]);
}
// Test flushes dealing with REINSERT mutations if they arrive in the middle
// of a flush.
TYPED_TEST(TestTablet, TestReinsertDuringFlush) {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
// Insert/delete/insert/delete in MemRowStore.
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
CHECK_OK(this->InsertTestRow(&writer, 0, 1));
ASSERT_OK(this->DeleteTestRow(&writer, 0));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
// During the snapshot flush, insert/delete/insert some more during the flush.
class MyCommonHooks : public Tablet::FlushCompactCommonHooks {
public:
explicit MyCommonHooks(TestFixture *test) : test_(test) {}
Status PostWriteSnapshot() OVERRIDE {
LocalTabletWriter writer(test_->tablet().get(), &test_->client_schema());
test_->InsertTestRow(&writer, 0, 1);
CHECK_OK(test_->DeleteTestRow(&writer, 0));
CHECK_EQ(1, writer.last_op_result().mutated_stores_size());
CHECK_EQ(1L, writer.last_op_result().mutated_stores(0).mrs_id());
test_->InsertTestRow(&writer, 0, 2);
CHECK_OK(test_->DeleteTestRow(&writer, 0));
CHECK_EQ(1, writer.last_op_result().mutated_stores_size());
CHECK_EQ(1L, writer.last_op_result().mutated_stores(0).mrs_id());
test_->InsertTestRow(&writer, 0, 3);
return Status::OK();
}
private:
TestFixture *test_;
};
shared_ptr<Tablet::FlushCompactCommonHooks> common_hooks(
reinterpret_cast<Tablet::FlushCompactCommonHooks *>(new MyCommonHooks(this)));
this->tablet()->SetFlushCompactCommonHooksForTests(common_hooks);
// Flush the tablet and make sure the data persists.
ASSERT_OK(this->tablet()->Flush());
vector<string> rows;
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 3, false), rows[0]);
}
// Test iterating over a tablet which contains data
// in the memrowset as well as two rowsets. This simple test
// only puts one row in each with no updates.
TYPED_TEST(TestTablet, TestRowIteratorSimple) {
const int kInRowSet1 = 1;
const int kInRowSet2 = 2;
const int kInMemRowSet = 3;
// Put a row in disk rowset 1 (insert and flush)
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
CHECK_OK(this->InsertTestRow(&writer, kInRowSet1, 0));
ASSERT_OK(this->tablet()->Flush());
// Put a row in disk rowset 2 (insert and flush)
CHECK_OK(this->InsertTestRow(&writer, kInRowSet2, 0));
ASSERT_OK(this->tablet()->Flush());
// Put a row in memrowset
CHECK_OK(this->InsertTestRow(&writer, kInMemRowSet, 0));
// Now iterate the tablet and make sure the rows show up
gscoped_ptr<RowwiseIterator> iter;
ASSERT_OK(this->tablet()->NewRowIterator(this->client_schema_, &iter));
ASSERT_OK(iter->Init(nullptr));
ASSERT_TRUE(iter->HasNext());
RowBlock block(this->schema_, 100, &this->arena_);
// First call to CopyNextRows should fetch the whole memrowset.
ASSERT_OK_FAST(iter->NextBlock(&block));
ASSERT_EQ(1, block.nrows()) << "should get only the one row from memrowset";
this->VerifyRow(block.row(0), kInMemRowSet, 0);
// Next, should fetch the older rowset
ASSERT_TRUE(iter->HasNext());
ASSERT_OK(iter->NextBlock(&block));
ASSERT_EQ(1, block.nrows()) << "should get only the one row from rowset 1";
this->VerifyRow(block.row(0), kInRowSet1, 0);
// Next, should fetch the newer rowset
ASSERT_TRUE(iter->HasNext());
ASSERT_OK(iter->NextBlock(&block));
ASSERT_EQ(1, block.nrows()) << "should get only the one row from rowset 2";
this->VerifyRow(block.row(0), kInRowSet2, 0);
ASSERT_FALSE(iter->HasNext());
}
TYPED_TEST(TestTablet, TestRowIteratorOrdered) {
// Create interleaved keys in each rowset, so they are clearly not in order
const int kNumRows = 128;
const int kNumBatches = 4;
LOG(INFO) << "Schema: " << this->schema_.ToString();
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
for (int i = 0; i < kNumBatches; i++) {
ASSERT_OK(this->tablet()->Flush());
for (int j = 0; j < kNumRows; j++) {
if (j % kNumBatches == i) {
LOG(INFO) << "Inserting row " << j;
CHECK_OK(this->InsertTestRow(&writer, 654321+j, j));
}
}
}
MvccSnapshot snap(*this->tablet()->mvcc_manager());
// Iterate through with a few different block sizes.
for (int numBlocks = 1; numBlocks < 5; numBlocks*=2) {
const int rowsPerBlock = kNumRows / numBlocks;
// Make a new ordered iterator for the current snapshot.
gscoped_ptr<RowwiseIterator> iter;
ASSERT_OK(this->tablet()->NewRowIterator(this->client_schema_, snap, Tablet::ORDERED, &iter));
ASSERT_OK(iter->Init(nullptr));
// Iterate the tablet collecting rows.
vector<shared_ptr<faststring> > rows;
for (int i = 0; i < numBlocks; i++) {
RowBlock block(this->schema_, rowsPerBlock, &this->arena_);
ASSERT_TRUE(iter->HasNext());
ASSERT_OK(iter->NextBlock(&block));
ASSERT_EQ(rowsPerBlock, block.nrows()) << "unexpected number of rows returned";
for (int j = 0; j < rowsPerBlock; j++) {
RowBlockRow row = block.row(j);
shared_ptr<faststring> encoded(new faststring());
this->client_schema_.EncodeComparableKey(row, encoded.get());
rows.push_back(encoded);
}
}
// Verify the collected rows, checking that they are sorted.
for (int j = 1; j < rows.size(); j++) {
// Use the schema for comparison, since this test is run with different schemas.
ASSERT_LT((*rows[j-1]).ToString(), (*rows[j]).ToString());
}
ASSERT_FALSE(iter->HasNext());
ASSERT_EQ(kNumRows, rows.size());
}
}
template<class SETUP>
bool TestSetupExpectsNulls(int32_t key_idx) {
return false;
}
template<>
bool TestSetupExpectsNulls<NullableValueTestSetup>(int32_t key_idx) {
// If it's a row that the test updates, then we should expect null
// based on whether it updated to NULL or away from NULL.
bool should_update = (key_idx % 2 == 1);
if (should_update) {
return (key_idx % 10 == 1);
}
// Otherwise, expect whatever was inserted.
return NullableValueTestSetup::ShouldInsertAsNull(key_idx);
}
// Test iterating over a tablet which has a memrowset
// and several rowsets, each with many rows of data.
TYPED_TEST(TestTablet, TestRowIteratorComplex) {
uint64_t max_rows = this->ClampRowCount(FLAGS_testiterator_num_inserts);
// Put a row in disk rowset 1 (insert and flush)
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
for (int32_t i = 0; i < max_rows; i++) {
ASSERT_OK_FAST(this->InsertTestRow(&writer, i, 0));
if (i % 300 == 0) {
LOG(INFO) << "Flushing after " << i << " rows inserted";
ASSERT_OK(this->tablet()->Flush());
}
}
LOG(INFO) << "Successfully inserted " << max_rows << " rows";
// At this point, we should have several rowsets as well
// as some data in memrowset.
// Update a subset of the rows
for (int32_t i = 0; i < max_rows; i++) {
bool should_update = (i % 2 == 1);
if (!should_update) continue;
bool set_to_null = TestSetupExpectsNulls<TypeParam>(i);
if (set_to_null) {
this->UpdateTestRowToNull(&writer, i);
} else {
ASSERT_OK_FAST(this->UpdateTestRow(&writer, i, i));
}
}
// Now iterate the tablet and make sure the rows show up.
gscoped_ptr<RowwiseIterator> iter;
const Schema& schema = this->client_schema_;
ASSERT_OK(this->tablet()->NewRowIterator(schema, &iter));
ASSERT_OK(iter->Init(nullptr));
LOG(INFO) << "Created iter: " << iter->ToString();
vector<bool> seen(max_rows, false);
int seen_count = 0;
RowBlock block(schema, 100, &this->arena_);
while (iter->HasNext()) {
this->arena_.Reset();
ASSERT_OK(iter->NextBlock(&block));
LOG(INFO) << "Fetched batch of " << block.nrows();
for (size_t i = 0; i < block.nrows(); i++) {
SCOPED_TRACE(schema.DebugRow(block.row(i)));
// Verify that we see each key exactly once.
int32_t key_idx = *schema.ExtractColumnFromRow<INT32>(block.row(i), 1);
if (seen[key_idx]) {
FAIL() << "Saw row " << key_idx << " multiple times";
}
seen[key_idx] = true;
seen_count++;
// Verify that we see the correctly updated value
const int32_t* val = schema.ExtractColumnFromRow<INT32>(block.row(i), 2);
bool set_to_null = TestSetupExpectsNulls<TypeParam>(key_idx);
bool should_update = (key_idx % 2 == 1);
if (val == nullptr) {
ASSERT_TRUE(set_to_null);
} else if (should_update) {
ASSERT_EQ(key_idx, *val);
} else {
ASSERT_EQ(0, *val);
}
}
}
ASSERT_EQ(seen_count, max_rows)
<< "expected to see all inserted data through iterator.";
}
// Test that, when a tablet has flushed data and is
// reopened, that the data persists
TYPED_TEST(TestTablet, TestInsertsPersist) {
uint64_t max_rows = this->ClampRowCount(FLAGS_testiterator_num_inserts);
this->InsertTestRows(0, max_rows, 0);
ASSERT_EQ(max_rows, this->TabletCount());
// Flush it.
ASSERT_OK(this->tablet()->Flush());
ASSERT_EQ(max_rows, this->TabletCount());
// Close and re-open tablet
this->TabletReOpen();
// Ensure that rows exist
ASSERT_EQ(max_rows, this->TabletCount());
this->VerifyTestRows(0, max_rows);
// TODO: add some more data, re-flush
}
// Test that when a row has been updated many times, it always yields
// the most recent value.
TYPED_TEST(TestTablet, TestMultipleUpdates) {
// Insert and update several times in MemRowSet
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
CHECK_OK(this->InsertTestRow(&writer, 0, 0));
ASSERT_OK(this->UpdateTestRow(&writer, 0, 1));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).mrs_id());
ASSERT_OK(this->UpdateTestRow(&writer, 0, 2));
ASSERT_OK(this->UpdateTestRow(&writer, 0, 3));
// Should see most recent value.
vector<string> out_rows;
ASSERT_OK(this->IterateToStringList(&out_rows));
ASSERT_EQ(1, out_rows.size());
ASSERT_EQ(this->setup_.FormatDebugRow(0, 3, false), out_rows[0]);
// Flush it.
ASSERT_OK(this->tablet()->Flush());
// Should still see most recent value.
ASSERT_OK(this->IterateToStringList(&out_rows));
ASSERT_EQ(1, out_rows.size());
ASSERT_EQ(this->setup_.FormatDebugRow(0, 3, false), out_rows[0]);
// Update the row a few times in DeltaMemStore
ASSERT_OK(this->UpdateTestRow(&writer, 0, 4));
ASSERT_EQ(1, writer.last_op_result().mutated_stores_size());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).rs_id());
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).dms_id());
ASSERT_OK(this->UpdateTestRow(&writer, 0, 5));
ASSERT_OK(this->UpdateTestRow(&writer, 0, 6));
// Should still see most recent value.
ASSERT_OK(this->IterateToStringList(&out_rows));
ASSERT_EQ(1, out_rows.size());
ASSERT_EQ(this->setup_.FormatDebugRow(0, 6, false), out_rows[0]);
// Force a compaction after adding a new rowset with one row.
CHECK_OK(this->InsertTestRow(&writer, 1, 0));
ASSERT_OK(this->tablet()->Flush());
ASSERT_EQ(2, this->tablet()->num_rowsets());
ASSERT_OK(this->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
ASSERT_EQ(1, this->tablet()->num_rowsets());
// Should still see most recent value.
ASSERT_OK(this->IterateToStringList(&out_rows));
ASSERT_EQ(2, out_rows.size());
ASSERT_EQ(this->setup_.FormatDebugRow(0, 6, false), out_rows[0]);
ASSERT_EQ(this->setup_.FormatDebugRow(1, 0, false), out_rows[1]);
}
TYPED_TEST(TestTablet, TestCompaction) {
uint64_t max_rows = this->ClampRowCount(FLAGS_testcompaction_num_rows);
uint64_t n_rows = max_rows / 3;
// Create three rowsets by inserting and flushing
LOG_TIMING(INFO, "Inserting rows") {
this->InsertTestRows(0, n_rows, 0);
LOG_TIMING(INFO, "Flushing rows") {
ASSERT_OK(this->tablet()->Flush());
}
// first MemRowSet had id 0, current one should be 1
ASSERT_EQ(1, this->tablet()->CurrentMrsIdForTests());
ASSERT_TRUE(
this->tablet()->metadata()->GetRowSetForTests(0)->HasDataForColumnIdForTests(
this->schema_.column_id(0)));
}
LOG_TIMING(INFO, "Inserting rows") {
this->InsertTestRows(n_rows, n_rows, 0);
LOG_TIMING(INFO, "Flushing rows") {
ASSERT_OK(this->tablet()->Flush());
}
// previous MemRowSet had id 1, current one should be 2
ASSERT_EQ(2, this->tablet()->CurrentMrsIdForTests());
ASSERT_TRUE(
this->tablet()->metadata()->GetRowSetForTests(1)->HasDataForColumnIdForTests(
this->schema_.column_id(0)));
}
LOG_TIMING(INFO, "Inserting rows") {
this->InsertTestRows(n_rows * 2, n_rows, 0);
LOG_TIMING(INFO, "Flushing rows") {
ASSERT_OK(this->tablet()->Flush());
}
// previous MemRowSet had id 2, current one should be 3
ASSERT_EQ(3, this->tablet()->CurrentMrsIdForTests());
ASSERT_TRUE(
this->tablet()->metadata()->GetRowSetForTests(2)->HasDataForColumnIdForTests(
this->schema_.column_id(0)));
}
// Issue compaction
LOG_TIMING(INFO, "Compacting rows") {
ASSERT_OK(this->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
// Compaction does not swap the memrowsets so we should still get 3
ASSERT_EQ(3, this->tablet()->CurrentMrsIdForTests());
ASSERT_EQ(n_rows * 3, this->TabletCount());
const RowSetMetadata *rowset_meta = this->tablet()->metadata()->GetRowSetForTests(3);
ASSERT_TRUE(rowset_meta != nullptr);
ASSERT_TRUE(rowset_meta->HasDataForColumnIdForTests(this->schema_.column_id(0)));
ASSERT_TRUE(rowset_meta->HasBloomDataBlockForTests());
}
// Old rowsets should not exist anymore
for (int i = 0; i <= 2; i++) {
const RowSetMetadata *rowset_meta = this->tablet()->metadata()->GetRowSetForTests(i);
ASSERT_TRUE(rowset_meta == nullptr);
}
}
enum MutationType {
MRS_MUTATION,
DELTA_MUTATION,
DUPLICATED_MUTATION
};
// Hook used by the Test*WithConcurrentMutation tests.
//
// Every time one of these hooks triggers, it inserts a row starting
// at row 20 (and increasing), and updates a row starting at row 10
// (and increasing).
template<class TestFixture>
class MyCommonHooks : public Tablet::FlushCompactCommonHooks {
public:
explicit MyCommonHooks(TestFixture *test, bool flushed)
: test_(test),
flushed_(flushed),
i_(0) {
}
Status DoHook(MutationType expected_mutation_type) {
LocalTabletWriter writer(test_->tablet().get(), &test_->client_schema());
RETURN_NOT_OK(test_->DeleteTestRow(&writer, i_));
switch (expected_mutation_type) {
case MRS_MUTATION:
CHECK_EQ(1, writer.last_op_result().mutated_stores_size());
CHECK(writer.last_op_result().mutated_stores(0).has_mrs_id());
break;
case DELTA_MUTATION:
CHECK_EQ(1, writer.last_op_result().mutated_stores_size());
CHECK(writer.last_op_result().mutated_stores(0).has_rs_id());
CHECK(writer.last_op_result().mutated_stores(0).has_dms_id());
break;
case DUPLICATED_MUTATION:
CHECK_EQ(2, writer.last_op_result().mutated_stores_size());
break;
}
RETURN_NOT_OK(test_->UpdateTestRow(&writer, 10 + i_, 1000 + i_));
test_->InsertTestRows(20 + i_, 1, 0);
test_->CheckCanIterate();
i_++;
return Status::OK();
}
virtual Status PostTakeMvccSnapshot() OVERRIDE {
// before we flush we update the MemRowSet afterwards we update the
// DeltaMemStore
if (!flushed_) {
return DoHook(MRS_MUTATION);
} else {
return DoHook(DELTA_MUTATION);
}
}
virtual Status PostWriteSnapshot() OVERRIDE {
if (!flushed_) {
return DoHook(MRS_MUTATION);
} else {
return DoHook(DELTA_MUTATION);
}
}
virtual Status PostSwapInDuplicatingRowSet() OVERRIDE {
return DoHook(DUPLICATED_MUTATION);
}
virtual Status PostReupdateMissedDeltas() OVERRIDE {
return DoHook(DUPLICATED_MUTATION);
}
virtual Status PostSwapNewRowSet() OVERRIDE {
return DoHook(DELTA_MUTATION);
}
protected:
TestFixture *test_;
bool flushed_;
int i_;
};
template<class TestFixture>
class MyFlushHooks : public Tablet::FlushFaultHooks, public MyCommonHooks<TestFixture> {
public:
explicit MyFlushHooks(TestFixture *test, bool flushed) :
MyCommonHooks<TestFixture>(test, flushed) {}
virtual Status PostSwapNewMemRowSet() { return this->DoHook(MRS_MUTATION); }
};
template<class TestFixture>
class MyCompactHooks : public Tablet::CompactionFaultHooks, public MyCommonHooks<TestFixture> {
public:
explicit MyCompactHooks(TestFixture *test, bool flushed) :
MyCommonHooks<TestFixture>(test, flushed) {}
Status PostSelectIterators() { return this->DoHook(DELTA_MUTATION); }
};
// Test for Flush with concurrent update, delete and insert during the
// various phases.
TYPED_TEST(TestTablet, TestFlushWithConcurrentMutation) {
this->InsertTestRows(0, 7, 0); // 0-6 inclusive: these rows will be deleted
this->InsertTestRows(10, 7, 0); // 10-16 inclusive: these rows will be updated
// Rows 20-26 inclusive will be inserted during the flush
// Inject hooks which mutate those rows and add more rows at
// each key stage of flushing.
shared_ptr<MyFlushHooks<TestFixture> > hooks(new MyFlushHooks<TestFixture>(this, false));
this->tablet()->SetFlushHooksForTests(hooks);
this->tablet()->SetFlushCompactCommonHooksForTests(hooks);
// First hook before we do the Flush
ASSERT_OK(hooks->DoHook(MRS_MUTATION));
// Then do the flush with the hooks enabled.
ASSERT_OK(this->tablet()->Flush());
// Now verify that the results saw all the mutated_stores.
vector<string> out_rows;
ASSERT_OK(this->IterateToStringList(&out_rows));
std::sort(out_rows.begin(), out_rows.end());
vector<string> expected_rows;
expected_rows.push_back(this->setup_.FormatDebugRow(10, 1000, true));
expected_rows.push_back(this->setup_.FormatDebugRow(11, 1001, true));
expected_rows.push_back(this->setup_.FormatDebugRow(12, 1002, true));
expected_rows.push_back(this->setup_.FormatDebugRow(13, 1003, true));
expected_rows.push_back(this->setup_.FormatDebugRow(14, 1004, true));
expected_rows.push_back(this->setup_.FormatDebugRow(15, 1005, true));
expected_rows.push_back(this->setup_.FormatDebugRow(16, 1006, true));
expected_rows.push_back(this->setup_.FormatDebugRow(20, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(21, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(22, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(23, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(24, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(25, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(26, 0, false));
std::sort(expected_rows.begin(), expected_rows.end());
// Verify that all the inserts and updates arrived and persisted.
LOG(INFO) << "Expected: " << JoinStrings(expected_rows, "\n");
// Verify that all the inserts and updates arrived and persisted.
LOG(INFO) << "Results: " << JoinStrings(out_rows, "\n");
ASSERT_EQ(expected_rows.size(), out_rows.size());
vector<string>::const_iterator exp_it = expected_rows.begin();
for (vector<string>::const_iterator out_it = out_rows.begin(); out_it!= out_rows.end();) {
ASSERT_EQ(*out_it, *exp_it);
out_it++;
exp_it++;
}
}
// Test for compaction with concurrent update and insert during the
// various phases.
TYPED_TEST(TestTablet, TestCompactionWithConcurrentMutation) {
// Create three rowsets by inserting and flushing.
// The rows from these layers will get updated or deleted during the flush:
// - rows 0-6 inclusive will be deleted
// - rows 10-16 inclusive will be updated
this->InsertTestRows(0, 2, 0); // rows 0-1
this->InsertTestRows(10, 2, 0); // rows 10-11
ASSERT_OK(this->tablet()->Flush());
this->InsertTestRows(2, 2, 0); // rows 2-3
this->InsertTestRows(12, 2, 0); // rows 12-13
ASSERT_OK(this->tablet()->Flush());
this->InsertTestRows(4, 3, 0); // rows 4-6
this->InsertTestRows(14, 3, 0); // rows 14-16
ASSERT_OK(this->tablet()->Flush());
// Rows 20-26 inclusive will be inserted during the flush.
shared_ptr<MyCompactHooks<TestFixture> > hooks(new MyCompactHooks<TestFixture>(this, true));
this->tablet()->SetCompactionHooksForTests(hooks);
this->tablet()->SetFlushCompactCommonHooksForTests(hooks);
// First hook pre-compaction.
ASSERT_OK(hooks->DoHook(DELTA_MUTATION));
// Issue compaction
ASSERT_OK(this->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
// Grab the resulting data into a vector.
vector<string> out_rows;
ASSERT_OK(this->IterateToStringList(&out_rows));
std::sort(out_rows.begin(), out_rows.end());
vector<string> expected_rows;
expected_rows.push_back(this->setup_.FormatDebugRow(10, 1000, true));
expected_rows.push_back(this->setup_.FormatDebugRow(11, 1001, true));
expected_rows.push_back(this->setup_.FormatDebugRow(12, 1002, true));
expected_rows.push_back(this->setup_.FormatDebugRow(13, 1003, true));
expected_rows.push_back(this->setup_.FormatDebugRow(14, 1004, true));
expected_rows.push_back(this->setup_.FormatDebugRow(15, 1005, true));
expected_rows.push_back(this->setup_.FormatDebugRow(16, 1006, true));
expected_rows.push_back(this->setup_.FormatDebugRow(20, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(21, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(22, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(23, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(24, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(25, 0, false));
expected_rows.push_back(this->setup_.FormatDebugRow(26, 0, false));
std::sort(expected_rows.begin(), expected_rows.end());
ASSERT_EQ(expected_rows.size(), out_rows.size());
// Verify that all the inserts and updates arrived and persisted.
LOG(INFO) << "Expected: " << JoinStrings(expected_rows, "\n");
// Verify that all the inserts and updates arrived and persisted.
LOG(INFO) << "Results: " << JoinStrings(out_rows, "\n");
vector<string>::const_iterator exp_it = expected_rows.begin();
for (vector<string>::const_iterator out_it = out_rows.begin(); out_it!= out_rows.end();) {
ASSERT_EQ(*out_it, *exp_it);
out_it++;
exp_it++;
}
}
// Test that metrics behave properly during tablet initialization
TYPED_TEST(TestTablet, TestMetricsInit) {
// Create a tablet, but do not open it
this->CreateTestTablet();
MetricRegistry* registry = this->harness()->metrics_registry();
std::stringstream out;
JsonWriter writer(&out, JsonWriter::PRETTY);
ASSERT_OK(registry->WriteAsJson(&writer, { "*" }, MetricJsonOptions()));
// Open tablet, should still work
this->harness()->Open();
ASSERT_OK(registry->WriteAsJson(&writer, { "*" }, MetricJsonOptions()));
}
// Test that we find the correct log segment size for different indexes.
TEST(TestTablet, TestGetLogRetentionSizeForIndex) {
std::map<int64_t, int64_t> idx_size_map;
// We build a map that represents 3 logs. The key is the index where that log ends, and the value
// is its size.
idx_size_map[3] = 1;
idx_size_map[6] = 10;
idx_size_map[9] = 100;
// The default value should return a size of 0.
int64_t min_log_index = -1;
ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 0);
// A value at the beginning of the first segment retains all the logs.
min_log_index = 1;
ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 111);
// A value at the end of the first segment also retains everything.
min_log_index = 3;
ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 111);
// Beginning of second segment, only retain that one and the next.
min_log_index = 4;
ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 110);
// Beginning of third segment, only retain that one.
min_log_index = 7;
ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 100);
// A value after all the passed segments, doesn't retain anything.
min_log_index = 10;
ASSERT_EQ(Tablet::GetLogRetentionSizeForIndex(min_log_index, idx_size_map), 0);
}
} // namespace tablet
} // namespace kudu