blob: 4bcc131beccdebb7c44b87ead4e86c56344aa2b8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <memory>
#include <stdlib.h>
#include <unordered_set>
#include "kudu/common/schema.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/server/logical_clock.h"
#include "kudu/tablet/deltamemstore.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/mutation.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DEFINE_int32(benchmark_num_passes, 100, "Number of passes to apply deltas in the benchmark");
using std::shared_ptr;
using std::unordered_set;
namespace kudu {
namespace tablet {
using fs::WritableBlock;
class TestDeltaMemStore : public KuduTest {
public:
TestDeltaMemStore()
: op_id_(consensus::MaximumOpId()),
schema_(CreateSchema()),
dms_(new DeltaMemStore(0, 0, new log::LogAnchorRegistry())),
mvcc_(scoped_refptr<server::Clock>(
server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp))) {
}
void SetUp() OVERRIDE {
KuduTest::SetUp();
fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root")));
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
}
static Schema CreateSchema() {
SchemaBuilder builder;
CHECK_OK(builder.AddColumn("col1", STRING));
CHECK_OK(builder.AddColumn("col2", STRING));
CHECK_OK(builder.AddColumn("col3", UINT32));
return builder.Build();
}
template<class Iterable>
void UpdateIntsAtIndexes(const Iterable &indexes_to_update) {
faststring buf;
RowChangeListEncoder update(&buf);
for (uint32_t idx_to_update : indexes_to_update) {
ScopedTransaction tx(&mvcc_);
tx.StartApplying();
update.Reset();
uint32_t new_val = idx_to_update * 10;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &new_val);
CHECK_OK(dms_->Update(tx.timestamp(), idx_to_update, RowChangeList(buf), op_id_));
tx.Commit();
}
}
void ApplyUpdates(const MvccSnapshot &snapshot,
uint32_t row_idx,
size_t col_idx,
ColumnBlock *cb) {
ColumnSchema col_schema(schema_.column(col_idx));
Schema single_col_projection({ col_schema },
{ schema_.column_id(col_idx) },
0);
DeltaIterator* raw_iter;
Status s = dms_->NewDeltaIterator(&single_col_projection, snapshot, &raw_iter);
if (s.IsNotFound()) {
return;
}
ASSERT_OK(s);
gscoped_ptr<DeltaIterator> iter(raw_iter);
ASSERT_OK(iter->Init(nullptr));
ASSERT_OK(iter->SeekToOrdinal(row_idx));
ASSERT_OK(iter->PrepareBatch(cb->nrows(), DeltaIterator::PREPARE_FOR_APPLY));
ASSERT_OK(iter->ApplyUpdates(0, cb));
}
protected:
static const int kStringColumn = 1;
static const int kIntColumn = 2;
consensus::OpId op_id_;
const Schema schema_;
shared_ptr<DeltaMemStore> dms_;
MvccManager mvcc_;
gscoped_ptr<FsManager> fs_manager_;
};
static void GenerateRandomIndexes(uint32_t range, uint32_t count,
unordered_set<uint32_t> *out) {
CHECK_LE(count, range / 2) <<
"this will be too slow unless count is much smaller than range";
out->clear();
for (int i = 0; i < count; i++) {
bool inserted = false;
do {
inserted = out->insert(random() % range).second;
} while (!inserted);
}
}
TEST_F(TestDeltaMemStore, TestUpdateCount) {
uint32_t n_rows = 1000;
faststring update_buf;
RowChangeListEncoder update(&update_buf);
for (uint32_t idx = 0; idx < n_rows; idx++) {
update.Reset();
if (idx % 4 == 0) {
char buf[256] = "update buf";
Slice s(buf);
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
}
if (idx % 2 == 0) {
ScopedTransaction tx(&mvcc_);
tx.StartApplying();
uint32_t new_val = idx * 10;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &new_val);
ASSERT_OK_FAST(dms_->Update(tx.timestamp(), idx, RowChangeList(update_buf), op_id_));
tx.Commit();
}
}
// Flush the delta file so that the stats get updated.
gscoped_ptr<WritableBlock> block;
ASSERT_OK(fs_manager_->CreateNewBlock(&block));
DeltaFileWriter dfw(std::move(block));
ASSERT_OK(dfw.Start());
gscoped_ptr<DeltaStats> stats;
dms_->FlushToFile(&dfw, &stats);
ASSERT_EQ(n_rows / 2, stats->update_count_for_col_id(schema_.column_id(kIntColumn)));
ASSERT_EQ(n_rows / 4, stats->update_count_for_col_id(schema_.column_id(kStringColumn)));
}
TEST_F(TestDeltaMemStore, TestDMSSparseUpdates) {
int n_rows = 1000;
// Update 100 random rows out of the 1000.
srand(12345);
unordered_set<uint32_t> indexes_to_update;
GenerateRandomIndexes(n_rows, 100, &indexes_to_update);
UpdateIntsAtIndexes(indexes_to_update);
ASSERT_EQ(100, dms_->Count());
// Now apply the updates from the DMS back to an array
ScopedColumnBlock<UINT32> read_back(1000);
for (int i = 0; i < 1000; i++) {
read_back[i] = 0xDEADBEEF;
}
MvccSnapshot snap(mvcc_);
ApplyUpdates(snap, 0, kIntColumn, &read_back);
// And verify that only the rows that we updated are modified within
// the array.
for (int i = 0; i < 1000; i++) {
// If this wasn't one of the ones we updated, expect our marker
if (indexes_to_update.find(i) == indexes_to_update.end()) {
// If this wasn't one of the ones we updated, expect our marker
ASSERT_EQ(0xDEADBEEF, read_back[i]);
} else {
// Otherwise expect the updated value
ASSERT_EQ(i * 10, read_back[i]);
}
}
}
// Performance test for KUDU-749: zipfian workloads can cause a lot
// of updates to a single row. This benchmark updates a single row many
// times and times how long it takes to apply those updates during
// the read path.
TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
const int kNumRows = 1000;
const int kNumUpdates = 10000;
const int kIdxToUpdate = 10;
const int kStringDataSize = 1000;
for (int i = 0; i < kNumUpdates; i++) {
faststring buf;
RowChangeListEncoder update(&buf);
ScopedTransaction tx(&mvcc_);
tx.StartApplying();
string str(kStringDataSize, 'x');
Slice s(str);
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
CHECK_OK(dms_->Update(tx.timestamp(), kIdxToUpdate, RowChangeList(buf), op_id_));
tx.Commit();
}
MvccSnapshot snap(mvcc_);
LOG_TIMING(INFO, "Applying updates") {
for (int i = 0; i < FLAGS_benchmark_num_passes; i++) {
ScopedColumnBlock<STRING> strings(kNumRows);
for (int i = 0; i < kNumRows; i++) {
strings[i] = Slice();
}
ApplyUpdates(snap, 0, kStringColumn, &strings);
}
}
}
// Test when a slice column has been updated multiple times in the
// memrowset that the referred to values properly end up in the
// right arena.
TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
faststring update_buf;
RowChangeListEncoder update(&update_buf);
// Update a cell, taking care that the buffer we use to perform
// the update gets cleared after usage. This ensures that the
// underlying data is properly copied into the DMS arena.
{
ScopedTransaction tx(&mvcc_);
tx.StartApplying();
char buf[256] = "update 1";
Slice s(buf);
update.AddColumnUpdate(schema_.column(0),
schema_.column_id(0), &s);
ASSERT_OK_FAST(dms_->Update(tx.timestamp(), 123, RowChangeList(update_buf), op_id_));
memset(buf, 0xff, sizeof(buf));
tx.Commit();
}
MvccSnapshot snapshot_after_first_update(mvcc_);
// Update the same cell again with a different value
{
ScopedTransaction tx(&mvcc_);
tx.StartApplying();
char buf[256] = "update 2";
Slice s(buf);
update.Reset();
update.AddColumnUpdate(schema_.column(0),
schema_.column_id(0), &s);
ASSERT_OK_FAST(dms_->Update(tx.timestamp(), 123, RowChangeList(update_buf), op_id_));
memset(buf, 0xff, sizeof(buf));
tx.Commit();
}
MvccSnapshot snapshot_after_second_update(mvcc_);
// Ensure we end up with a second entry for the cell, at the
// new timestamp
ASSERT_EQ(2, dms_->Count());
// Ensure that we ended up with the right data, and that the old MVCC snapshot
// yields the correct old value.
ScopedColumnBlock<STRING> read_back(1);
ApplyUpdates(snapshot_after_first_update, 123, 0, &read_back);
ASSERT_EQ("update 1", read_back[0].ToString());
ApplyUpdates(snapshot_after_second_update, 123, 0, &read_back);
ASSERT_EQ("update 2", read_back[0].ToString());
}
// Test that if two updates come in with out-of-order transaction IDs,
// the one with the higher transaction ID ends up winning.
//
// This is important during flushing when updates against the old rowset
// are carried forward, but may fall behind newer transactions.
TEST_F(TestDeltaMemStore, TestOutOfOrderTxns) {
faststring update_buf;
RowChangeListEncoder update(&update_buf);
{
ScopedTransaction tx1(&mvcc_);
ScopedTransaction tx2(&mvcc_);
tx2.StartApplying();
Slice s("update 2");
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
ASSERT_OK(dms_->Update(tx2.timestamp(), 123, RowChangeList(update_buf), op_id_));
tx2.Commit();
tx1.StartApplying();
update.Reset();
s = Slice("update 1");
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
ASSERT_OK(dms_->Update(tx1.timestamp(), 123, RowChangeList(update_buf), op_id_));
tx1.Commit();
}
// Ensure we end up two entries for the cell.
ASSERT_EQ(2, dms_->Count());
// Ensure that we ended up with the right data.
ScopedColumnBlock<STRING> read_back(1);
ApplyUpdates(MvccSnapshot(mvcc_), 123, kStringColumn, &read_back);
ASSERT_EQ("update 2", read_back[0].ToString());
}
TEST_F(TestDeltaMemStore, TestDMSBasic) {
faststring update_buf;
RowChangeListEncoder update(&update_buf);
char buf[256];
for (uint32_t i = 0; i < 1000; i++) {
ScopedTransaction tx(&mvcc_);
tx.StartApplying();
update.Reset();
uint32_t val = i * 10;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &val);
snprintf(buf, sizeof(buf), "hello %d", i);
Slice s(buf);
update.AddColumnUpdate(schema_.column(kStringColumn),
schema_.column_id(kStringColumn), &s);
ASSERT_OK_FAST(dms_->Update(tx.timestamp(), i, RowChangeList(update_buf), op_id_));
tx.Commit();
}
ASSERT_EQ(1000, dms_->Count());
// Read back the values and check correctness.
MvccSnapshot snap(mvcc_);
ScopedColumnBlock<UINT32> read_back(1000);
ScopedColumnBlock<STRING> read_back_slices(1000);
ApplyUpdates(snap, 0, kIntColumn, &read_back);
ApplyUpdates(snap, 0, kStringColumn, &read_back_slices);
// When reading back the slice, do so into a different buffer -
// otherwise if the slice references weren't properly copied above,
// we'd be writing our comparison value into the same buffer that
// we're comparing against!
char buf2[256];
for (uint32_t i = 0; i < 1000; i++) {
ASSERT_EQ(i * 10, read_back[i]) << "failed at iteration " << i;
snprintf(buf2, sizeof(buf2), "hello %d", i);
Slice s(buf2);
ASSERT_EQ(0, s.compare(read_back_slices[i]));
}
// Update the same rows again, with new transactions. Even though
// the same rows are updated, new entries should be added because
// these are separate transactions and we need to maintain the
// old ones for snapshot consistency purposes.
for (uint32_t i = 0; i < 1000; i++) {
ScopedTransaction tx(&mvcc_);
tx.StartApplying();
update.Reset();
uint32_t val = i * 20;
update.AddColumnUpdate(schema_.column(kIntColumn),
schema_.column_id(kIntColumn), &val);
ASSERT_OK_FAST(dms_->Update(tx.timestamp(), i, RowChangeList(update_buf), op_id_));
tx.Commit();
}
ASSERT_EQ(2000, dms_->Count());
}
TEST_F(TestDeltaMemStore, TestIteratorDoesUpdates) {
unordered_set<uint32_t> to_update;
for (uint32_t i = 0; i < 1000; i++) {
to_update.insert(i);
}
UpdateIntsAtIndexes(to_update);
ASSERT_EQ(1000, dms_->Count());
// TODO: test snapshot reads from different points
MvccSnapshot snap(mvcc_);
ScopedColumnBlock<UINT32> block(100);
DeltaIterator* raw_iter;
Status s = dms_->NewDeltaIterator(&schema_, snap, &raw_iter);
if (s.IsNotFound()) {
FAIL() << "Iterator fell outside of the range of the snapshot";
}
ASSERT_OK(s);
gscoped_ptr<DMSIterator> iter(down_cast<DMSIterator *>(raw_iter));
ASSERT_OK(iter->Init(nullptr));
int block_start_row = 50;
ASSERT_OK(iter->SeekToOrdinal(block_start_row));
ASSERT_OK(iter->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block));
for (int i = 0; i < 100; i++) {
int actual_row = block_start_row + i;
ASSERT_EQ(actual_row * 10, block[i]) << "at row " << actual_row;
}
// Apply the next block
block_start_row += block.nrows();
ASSERT_OK(iter->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block));
for (int i = 0; i < 100; i++) {
int actual_row = block_start_row + i;
ASSERT_EQ(actual_row * 10, block[i]) << "at row " << actual_row;
}
}
TEST_F(TestDeltaMemStore, TestCollectMutations) {
Arena arena(1024, 1024);
// Update rows 5 and 12
vector<uint32_t> to_update;
to_update.push_back(5);
to_update.push_back(12);
UpdateIntsAtIndexes(to_update);
ASSERT_EQ(2, dms_->Count());
MvccSnapshot snap(mvcc_);
const int kBatchSize = 10;
vector<Mutation *> mutations;
mutations.resize(kBatchSize);
DeltaIterator* raw_iter;
Status s = dms_->NewDeltaIterator(&schema_, snap, &raw_iter);
if (s.IsNotFound()) {
FAIL() << "Iterator fell outside of the range of the snapshot";
}
ASSERT_OK(s);
gscoped_ptr<DMSIterator> iter(down_cast<DMSIterator *>(raw_iter));
ASSERT_OK(iter->Init(nullptr));
ASSERT_OK(iter->SeekToOrdinal(0));
ASSERT_OK(iter->PrepareBatch(kBatchSize, DeltaIterator::PREPARE_FOR_COLLECT));
ASSERT_OK(iter->CollectMutations(&mutations, &arena));
// Only row 5 is updated, everything else should be NULL.
for (int i = 0; i < kBatchSize; i++) {
string str = Mutation::StringifyMutationList(schema_, mutations[i]);
VLOG(1) << "row " << i << ": " << str;
if (i != 5) {
EXPECT_EQ("[]", str);
} else {
EXPECT_EQ("[@1(SET col3=50)]", str);
}
}
// Collect the next batch of 10.
arena.Reset();
std::fill(mutations.begin(), mutations.end(), reinterpret_cast<Mutation *>(NULL));
ASSERT_OK(iter->PrepareBatch(kBatchSize, DeltaIterator::PREPARE_FOR_COLLECT));
ASSERT_OK(iter->CollectMutations(&mutations, &arena));
// Only row 2 is updated, everything else should be NULL.
for (int i = 0; i < 10; i++) {
string str = Mutation::StringifyMutationList(schema_, mutations[i]);
VLOG(1) << "row " << i << ": " << str;
if (i != 2) {
EXPECT_EQ("[]", str);
} else {
EXPECT_EQ("[@2(SET col3=120)]", str);
}
}
}
} // namespace tablet
} // namespace kudu