blob: 72273fae39eda64972aec9b0b3ddf3acd3c35456 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <memory>
#include <time.h>
#include "kudu/common/row.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/tablet/delta_compaction.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/diskrowset-test-base.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/util/env.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
DEFINE_double(update_fraction, 0.1f, "fraction of rows to update");
DECLARE_bool(cfile_lazy_open);
DECLARE_int32(cfile_default_block_size);
DECLARE_double(tablet_delta_store_major_compact_min_ratio);
DECLARE_int32(tablet_delta_store_minor_compact_max);
using std::is_sorted;
using std::shared_ptr;
using std::unordered_set;
namespace kudu {
namespace tablet {
// TODO: add test which calls CopyNextRows on an iterator with no more
// rows - i think it segfaults!
// Test round-trip writing and reading back a rowset with
// multiple columns. Does not test any modifications.
TEST_F(TestRowSet, TestRowSetRoundTrip) {
WriteTestRowSet();
// Now open the DiskRowSet for read
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// First iterate over all columns
LOG_TIMING(INFO, "Iterating over all columns") {
IterateProjection(*rs, schema_, n_rows_);
}
// Now iterate only over the key column
Schema proj_key;
ASSERT_OK(schema_.CreateProjectionByNames({ "key" }, &proj_key));
LOG_TIMING(INFO, "Iterating over only key column") {
IterateProjection(*rs, proj_key, n_rows_);
}
// Now iterate only over the non-key column
Schema proj_val;
ASSERT_OK(schema_.CreateProjectionByNames({ "val" }, &proj_val));
LOG_TIMING(INFO, "Iterating over only val column") {
IterateProjection(*rs, proj_val, n_rows_);
}
// Test that CheckRowPresent returns correct results
ProbeStats stats;
// 1. Check a key which comes before all keys in rowset
{
RowBuilder rb(schema_.CreateKeyProjection());
rb.AddString(Slice("h"));
RowSetKeyProbe probe(rb.row());
bool present;
ASSERT_OK(rs->CheckRowPresent(probe, &present, &stats));
ASSERT_FALSE(present);
}
// 2. Check a key which comes after all keys in rowset
{
RowBuilder rb(schema_.CreateKeyProjection());
rb.AddString(Slice("z"));
RowSetKeyProbe probe(rb.row());
bool present;
ASSERT_OK(rs->CheckRowPresent(probe, &present, &stats));
ASSERT_FALSE(present);
}
// 3. Check a key which is not present, but comes between present
// keys
{
RowBuilder rb(schema_.CreateKeyProjection());
rb.AddString(Slice("hello 00000000000049x"));
RowSetKeyProbe probe(rb.row());
bool present;
ASSERT_OK(rs->CheckRowPresent(probe, &present, &stats));
ASSERT_FALSE(present);
}
// 4. Check a key which is present
{
char buf[256];
RowBuilder rb(schema_.CreateKeyProjection());
FormatKey(49, buf, sizeof(buf));
rb.AddString(Slice(buf));
RowSetKeyProbe probe(rb.row());
bool present;
ASSERT_OK(rs->CheckRowPresent(probe, &present, &stats));
ASSERT_TRUE(present);
}
}
// Test writing a rowset, and then updating some rows in it.
TEST_F(TestRowSet, TestRowSetUpdate) {
WriteTestRowSet();
// Now open the DiskRowSet for read
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Add an update to the delta tracker for a number of keys
// which exist. These updates will change the value to
// equal idx*5 (whereas in the original data, value = idx)
unordered_set<uint32_t> updated;
UpdateExistingRows(rs.get(), FLAGS_update_fraction, &updated);
ASSERT_EQ(static_cast<int>(n_rows_ * FLAGS_update_fraction),
rs->delta_tracker_->dms_->Count());
// Try to add a mutation for a key not in the file (but which falls
// between two valid keys)
faststring buf;
RowChangeListEncoder enc(&buf);
enc.SetToDelete();
Timestamp timestamp(0);
RowBuilder rb(schema_.CreateKeyProjection());
rb.AddString(Slice("hello 00000000000049x"));
RowSetKeyProbe probe(rb.row());
OperationResultPB result;
ProbeStats stats;
Status s = rs->MutateRow(timestamp, probe, enc.as_changelist(), op_id_, &stats, &result);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(0, result.mutated_stores_size());
// Now read back the value column, and verify that the updates
// are visible.
VerifyUpdates(*rs, updated);
}
TEST_F(TestRowSet, TestRandomRead) {
// Write 100 rows.
WriteTestRowSet(100);
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Read un-updated row.
VerifyRandomRead(*rs, "hello 000000000000050",
"(string key=hello 000000000000050, uint32 val=50)");
NO_FATALS();
// Update the row.
OperationResultPB result;
ASSERT_OK(UpdateRow(rs.get(), 50, 12345, &result));
// Read it again -- should see the updated value.
VerifyRandomRead(*rs, "hello 000000000000050",
"(string key=hello 000000000000050, uint32 val=12345)");
NO_FATALS();
// Try to read a row which comes before the first key.
// This should return no rows.
VerifyRandomRead(*rs, "aaaaa", "");
NO_FATALS();
// Same with a row which falls between keys.
VerifyRandomRead(*rs, "hello 000000000000050_between_keys", "");
NO_FATALS();
// And a row which falls after the last key.
VerifyRandomRead(*rs, "hello 000000000000101", "");
NO_FATALS();
}
// Test Delete() support within a DiskRowSet.
TEST_F(TestRowSet, TestDelete) {
// Write and open a DiskRowSet with 2 rows.
WriteTestRowSet(2);
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
MvccSnapshot snap_before_delete(mvcc_);
// Delete one of the two rows
OperationResultPB result;
ASSERT_OK(DeleteRow(rs.get(), 0, &result));
ASSERT_EQ(1, result.mutated_stores_size());
ASSERT_EQ(0L, result.mutated_stores(0).rs_id());
ASSERT_EQ(0L, result.mutated_stores(0).dms_id());
MvccSnapshot snap_after_delete(mvcc_);
vector<string> rows;
Status s;
for (int i = 0; i < 2; i++) {
// Reading the MVCC snapshot prior to deletion should show the row.
ASSERT_OK(DumpRowSet(*rs, schema_, snap_before_delete, &rows));
ASSERT_EQ(2, rows.size());
EXPECT_EQ("(string key=hello 000000000000000, uint32 val=0)", rows[0]);
EXPECT_EQ("(string key=hello 000000000000001, uint32 val=1)", rows[1]);
// Reading the MVCC snapshot after the deletion should hide the row.
ASSERT_OK(DumpRowSet(*rs, schema_, snap_after_delete, &rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ("(string key=hello 000000000000001, uint32 val=1)", rows[0]);
// Trying to delete or update the same row again should fail.
OperationResultPB result;
s = DeleteRow(rs.get(), 0, &result);
ASSERT_TRUE(s.IsNotFound()) << "bad status: " << s.ToString();
ASSERT_EQ(0, result.mutated_stores_size());
result.Clear();
s = UpdateRow(rs.get(), 0, 12345, &result);
ASSERT_TRUE(s.IsNotFound()) << "bad status: " << s.ToString();
ASSERT_EQ(0, result.mutated_stores_size());
// CheckRowPresent should return false.
bool present;
ASSERT_OK(CheckRowPresent(*rs, 0, &present));
EXPECT_FALSE(present);
if (i == 1) {
// Flush DMS. The second pass through the loop will re-verify that the
// externally visible state of the layer has not changed.
// deletions now in a DeltaFile.
ASSERT_OK(rs->FlushDeltas());
}
}
}
TEST_F(TestRowSet, TestDMSFlush) {
WriteTestRowSet();
unordered_set<uint32_t> updated;
// Now open the DiskRowSet for read
{
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Add an update to the delta tracker for a number of keys
// which exist. These updates will change the value to
// equal idx*5 (whereas in the original data, value = idx)
UpdateExistingRows(rs.get(), FLAGS_update_fraction, &updated);
ASSERT_EQ(static_cast<int>(n_rows_ * FLAGS_update_fraction),
rs->delta_tracker_->dms_->Count());
ASSERT_OK(rs->FlushDeltas());
// Check that the DiskRowSet's DMS has now been emptied.
ASSERT_EQ(0, rs->delta_tracker_->dms_->Count());
// Now read back the value column, and verify that the updates
// are visible.
SCOPED_TRACE("before reopen");
VerifyUpdates(*rs, updated);
}
LOG(INFO) << "Reopening rowset ===============";
// Close and re-open the rowset and ensure that the updates were
// persistent.
{
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Now read back the value column, and verify that the updates
// are visible.
SCOPED_TRACE("after reopen");
VerifyUpdates(*rs, updated);
}
}
// Test that when a single row is updated multiple times, we can query the
// historical values using MVCC, even after it is flushed.
TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) {
const Slice key_slice("row");
// Write a single row into a new DiskRowSet.
LOG_TIMING(INFO, "Writing rowset") {
DiskRowSetWriter drsw(rowset_meta_.get(), &schema_,
BloomFilterSizing::BySizeAndFPRate(32*1024, 0.01f));
ASSERT_OK(drsw.Open());
RowBuilder rb(schema_);
rb.AddString(key_slice);
rb.AddUint32(1);
ASSERT_OK_FAST(WriteRow(rb.data(), &drsw));
ASSERT_OK(drsw.Finish());
}
// Reopen the rowset.
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
// Take a snapshot of the pre-update state.
vector<MvccSnapshot> snaps;
snaps.push_back(MvccSnapshot(mvcc_));
// Update the single row multiple times, taking an MVCC snapshot
// after each update.
faststring update_buf;
RowChangeListEncoder update(&update_buf);
for (uint32_t i = 2; i <= 5; i++) {
{
ScopedTransaction tx(&mvcc_);
tx.StartApplying();
update.Reset();
update.AddColumnUpdate(schema_.column(1), schema_.column_id(1), &i);
RowBuilder rb(schema_.CreateKeyProjection());
rb.AddString(key_slice);
RowSetKeyProbe probe(rb.row());
OperationResultPB result;
ProbeStats stats;
ASSERT_OK_FAST(rs->MutateRow(tx.timestamp(),
probe,
RowChangeList(update_buf),
op_id_,
&stats,
&result));
ASSERT_EQ(1, result.mutated_stores_size());
ASSERT_EQ(0L, result.mutated_stores(0).rs_id());
ASSERT_EQ(0L, result.mutated_stores(0).dms_id());
tx.Commit();
}
snaps.push_back(MvccSnapshot(mvcc_));
}
// Ensure that MVCC is respected by reading the value at each of the stored
// snapshots.
ASSERT_EQ(5, snaps.size());
for (int i = 0; i < 5; i++) {
SCOPED_TRACE(i);
gscoped_ptr<RowwiseIterator> iter;
ASSERT_OK(rs->NewRowIterator(&schema_, snaps[i], &iter));
string data = InitAndDumpIterator(std::move(iter));
EXPECT_EQ(StringPrintf("(string key=row, uint32 val=%d)", i + 1), data);
}
// Flush deltas to disk and ensure that the historical versions are still
// accessible.
ASSERT_OK(rs->FlushDeltas());
for (int i = 0; i < 5; i++) {
SCOPED_TRACE(i);
gscoped_ptr<RowwiseIterator> iter;
ASSERT_OK(rs->NewRowIterator(&schema_, snaps[i], &iter));
string data = InitAndDumpIterator(std::move(iter));
EXPECT_EQ(StringPrintf("(string key=row, uint32 val=%d)", i + 1), data);
}
}
// Similar to TestDMSFlush above, except does not actually verify
// the results (since the verification step is expensive). Additionally,
// loops the "read" side of the benchmark a number of times, so that
// the speed of applying deltas during read can be micro-benchmarked.
//
// This is most usefully run with an invocation like:
// ./rowset-test --gtest_filter=\*Performance --roundtrip_num_rows=1000000
// --n_read_passes=1000 --update_fraction=0.01
TEST_F(TestRowSet, TestDeltaApplicationPerformance) {
WriteTestRowSet();
// Now open the DiskRowSet for read
{
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
BenchmarkIterationPerformance(*rs.get(),
StringPrintf("Reading %zd rows prior to updates %d times",
n_rows_, FLAGS_n_read_passes));
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
BenchmarkIterationPerformance(*rs.get(),
StringPrintf("Reading %zd rows with %.2f%% updates %d times (updates in DMS)",
n_rows_, FLAGS_update_fraction * 100.0f,
FLAGS_n_read_passes));
ASSERT_OK(rs->FlushDeltas());
BenchmarkIterationPerformance(*rs.get(),
StringPrintf("Reading %zd rows with %.2f%% updates %d times (updates on disk)",
n_rows_, FLAGS_update_fraction * 100.0f,
FLAGS_n_read_passes));
}
}
TEST_F(TestRowSet, TestRollingDiskRowSetWriter) {
// Set small block size so that we can roll frequently. Otherwise
// we couldn't output such small files.
google::FlagSaver saver;
FLAGS_cfile_default_block_size = 4096;
RollingDiskRowSetWriter writer(tablet()->metadata(), schema_,
BloomFilterSizing::BySizeAndFPRate(32*1024, 0.01f),
64 * 1024); // roll every 64KB
DoWriteTestRowSet(10000, &writer);
// Should have rolled 4 times.
vector<shared_ptr<RowSetMetadata> > metas;
writer.GetWrittenRowSetMetadata(&metas);
EXPECT_EQ(4, metas.size());
for (const shared_ptr<RowSetMetadata>& meta : metas) {
ASSERT_TRUE(meta->HasDataForColumnIdForTests(schema_.column_id(0)));
}
}
TEST_F(TestRowSet, TestMakeDeltaIteratorMergerUnlocked) {
WriteTestRowSet();
// Now open the DiskRowSet for read
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas());
DeltaTracker *dt = rs->delta_tracker();
int num_stores = dt->redo_delta_stores_.size();
vector<shared_ptr<DeltaStore> > compacted_stores;
vector<BlockId> compacted_blocks;
shared_ptr<DeltaIterator> merge_iter;
ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(0, num_stores - 1, &schema_,
&compacted_stores,
&compacted_blocks, &merge_iter));
vector<string> results;
ASSERT_OK(DebugDumpDeltaIterator(REDO, merge_iter.get(), schema_,
ITERATE_OVER_ALL_ROWS,
&results));
for (const string &str : results) {
VLOG(1) << str;
}
ASSERT_EQ(compacted_stores.size(), num_stores);
ASSERT_EQ(compacted_blocks.size(), num_stores);
ASSERT_TRUE(is_sorted(results.begin(), results.end()));
}
void BetweenZeroAndOne(double to_check) {
ASSERT_LT(0, to_check);
ASSERT_GT(1, to_check);
}
TEST_F(TestRowSet, TestCompactStores) {
// With this setting, we want major compactions to basically always have a score.
FLAGS_tablet_delta_store_major_compact_min_ratio = 0.0001;
// With this setting, the perf improvement will be 0 until we have two files, at which point
// it will be the expected ratio, then with three files we get the maximum improvement.
FLAGS_tablet_delta_store_minor_compact_max = 3;
// Turning this off so that we can call DeltaStoresCompactionPerfImprovementScore without having
// to open the files after creating them.
FLAGS_cfile_lazy_open = false;
WriteTestRowSet();
shared_ptr<DiskRowSet> rs;
ASSERT_OK(OpenTestRowSet(&rs));
ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
// Write a first delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas());
// One file isn't enough for minor compactions, but a major compaction can run.
ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
// Write a second delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas());
// Two files is enough for all delta compactions.
BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
// Write a third delta file.
UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
ASSERT_OK(rs->FlushDeltas());
// We're hitting the max for minor compactions but not for major compactions.
ASSERT_EQ(1, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
// Compact the deltafiles
DeltaTracker *dt = rs->delta_tracker();
int num_stores = dt->redo_delta_stores_.size();
VLOG(1) << "Number of stores before compaction: " << num_stores;
ASSERT_EQ(num_stores, 3);
ASSERT_OK(dt->CompactStores(0, num_stores - 1));
num_stores = dt->redo_delta_stores_.size();
VLOG(1) << "Number of stores after compaction: " << num_stores;
ASSERT_EQ(1, num_stores);
// Back to one store, can't minor compact.
ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
// Verify that the resulting deltafile is valid
vector<shared_ptr<DeltaStore> > compacted_stores;
vector<BlockId> compacted_blocks;
shared_ptr<DeltaIterator> merge_iter;
ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(0, num_stores - 1, &schema_,
&compacted_stores,
&compacted_blocks, &merge_iter));
vector<string> results;
ASSERT_OK(DebugDumpDeltaIterator(REDO, merge_iter.get(), schema_,
ITERATE_OVER_ALL_ROWS,
&results));
for (const string &str : results) {
VLOG(1) << str;
}
ASSERT_TRUE(is_sorted(results.begin(), results.end()));
}
} // namespace tablet
} // namespace kudu