blob: 48857bf7b0bbc7b6d8b58d3a228cae39a5996aee [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/schema.h"
#include "kudu/fs/io_context.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
DECLARE_double(cfile_inject_corruption);
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace tablet {
using strings::Substitute;
class TestMajorDeltaCompaction : public KuduRowSetTest {
public:
TestMajorDeltaCompaction() :
KuduRowSetTest(Schema({ ColumnSchema("key", STRING),
ColumnSchema("val1", INT32),
ColumnSchema("val2", STRING),
ColumnSchema("val3", INT32),
ColumnSchema("val4", STRING) }, 1)) {
}
struct ExpectedRow {
string key;
int32_t val1;
string val2;
int32_t val3;
string val4;
string Formatted() const {
return strings::Substitute(
R"((string key="$0", int32 val1=$1, string val2="$2", int32 val3=$3, string val4="$4"))",
key, val1, val2, val3, val4);
}
};
virtual void SetUp() OVERRIDE {
KuduRowSetTest::SetUp();
}
// Insert data into tablet_, setting up equivalent state in
// expected_state_.
void WriteTestTablet(int nrows) {
LocalTabletWriter writer(tablet().get(), &client_schema_);
KuduPartialRow ins_row(&client_schema_);
for (int i = 0; i < nrows; i++) {
ExpectedRow row;
row.key = StringPrintf("hello %08d", i);
row.val1 = i * 2;
row.val2 = StringPrintf("a %08d", i * 2);
row.val3 = i * 10;
row.val4 = StringPrintf("b %08d", i * 10);
int col = 0;
CHECK_OK(ins_row.SetStringNoCopy(col++, row.key));
CHECK_OK(ins_row.SetInt32(col++, row.val1));
CHECK_OK(ins_row.SetStringNoCopy(col++, row.val2));
CHECK_OK(ins_row.SetInt32(col++, row.val3));
CHECK_OK(ins_row.SetStringNoCopy(col++, row.val4));
ASSERT_OK_FAST(writer.Insert(ins_row));
expected_state_.push_back(row);
}
}
// Delete the data that was inserted and clear the expected state, end to front.
void DeleteRows(int nrows) {
LocalTabletWriter writer(tablet().get(), &client_schema_);
KuduPartialRow del_row(&client_schema_);
for (int i = nrows - 1; i >= 0; i--) {
CHECK_OK(del_row.SetStringNoCopy(0, expected_state_[i].key));
ASSERT_OK(writer.Delete(del_row));
expected_state_.pop_back();
}
ASSERT_EQ(expected_state_.size(), 0);
}
// Update the data, touching only odd or even rows based on the
// value of 'even'.
// Makes corresponding updates in expected_state_.
void UpdateRows(int nrows, bool even) {
LocalTabletWriter writer(tablet().get(), &client_schema_);
KuduPartialRow prow(&client_schema_);
for (int idx = 0; idx < nrows; idx++) {
ExpectedRow* row = &expected_state_[idx];
if ((idx % 2 == 0) == even) {
// Set key
CHECK_OK(prow.SetStringNoCopy(0, row->key));
// Update the data
row->val1 *= 2;
row->val3 *= 2;
row->val4.append("[U]");
// Apply the updates.
CHECK_OK(prow.SetInt32(1, row->val1));
CHECK_OK(prow.SetInt32(3, row->val3));
CHECK_OK(prow.SetStringNoCopy(4, row->val4));
ASSERT_OK(writer.Update(prow));
}
}
}
// Verify that the data seen by scanning the tablet matches the data in
// expected_state_.
void VerifyData() {
MvccSnapshot snap(*tablet()->mvcc_manager());
VerifyDataWithMvccAndExpectedState(snap, expected_state_);
}
void VerifyDataWithMvccAndExpectedState(const MvccSnapshot& snap,
const vector<ExpectedRow>& passed_expected_state) {
RowIteratorOptions opts;
opts.projection = &client_schema_;
opts.snap_to_include = snap;
unique_ptr<RowwiseIterator> row_iter;
ASSERT_OK(tablet()->NewRowIterator(std::move(opts), &row_iter));
ASSERT_OK(row_iter->Init(nullptr));
vector<string> results;
ASSERT_OK(IterateToStringList(row_iter.get(), &results));
VLOG(1) << "Results of iterating over the updated materialized rows:";
ASSERT_EQ(passed_expected_state.size(), results.size());
for (int i = 0; i < results.size(); i++) {
SCOPED_TRACE(Substitute("row $0", i));
const string& str = results[i];
const ExpectedRow& expected = passed_expected_state[i];
ASSERT_EQ(expected.Formatted(), str);
}
}
MvccManager mvcc_;
vector<ExpectedRow> expected_state_;
};
// Regression test for KUDU-2656, wherein a corruption during a major delta
// compaction would lead to a failure in debug mode.
TEST_F(TestMajorDeltaCompaction, TestKudu2656) {
constexpr int kNumRows = 100;
NO_FATALS(WriteTestTablet(kNumRows));
ASSERT_OK(tablet()->Flush());
vector<shared_ptr<RowSet>> all_rowsets;
tablet()->GetRowSetsForTests(&all_rowsets);
ASSERT_FALSE(all_rowsets.empty());
shared_ptr<RowSet> rs = all_rowsets.front();
// Create some on-disk deltas.
NO_FATALS(UpdateRows(kNumRows, /*even=*/false));
ASSERT_OK(tablet()->FlushBiggestDMS());
// Major compact some columns.
vector<ColumnId> col_ids = { schema_.column_id(1),
schema_.column_id(3),
schema_.column_id(4) };
// Injecting a failure should result in an error, not a crash.
FLAGS_cfile_inject_corruption = 1.0;
fs::IOContext io_context({ "test-tablet" });
Status s = tablet()->DoMajorDeltaCompaction(col_ids, rs, &io_context);
ASSERT_TRUE(s.IsCorruption()) << s.ToString();
}
// Tests a major delta compaction run.
// Verifies that the output rowset accurately reflects the mutations, but keeps the
// unchanged columns intact.
TEST_F(TestMajorDeltaCompaction, TestCompact) {
const int kNumRows = 100;
NO_FATALS(WriteTestTablet(kNumRows));
ASSERT_OK(tablet()->Flush());
vector<shared_ptr<RowSet> > all_rowsets;
tablet()->GetRowSetsForTests(&all_rowsets);
shared_ptr<RowSet> rs = all_rowsets.front();
vector<ColumnId> col_ids_to_compact = { schema_.column_id(1),
schema_.column_id(3),
schema_.column_id(4) };
// We'll run a few rounds of update/compact to make sure
// that we don't get into some funny state (regression test for
// an earlier bug).
// We first compact all the columns, then for each other round we do one less,
// so that we test a few combinations.
for (int i = 0; i < 3; i++) {
SCOPED_TRACE(Substitute("Update/compact round $0", i));
// Update the even rows and verify.
NO_FATALS(UpdateRows(kNumRows, false));
NO_FATALS(VerifyData());
// Flush the deltas, make sure data stays the same.
ASSERT_OK(tablet()->FlushBiggestDMS());
NO_FATALS(VerifyData());
// Update the odd rows and flush deltas
NO_FATALS(UpdateRows(kNumRows, true));
ASSERT_OK(tablet()->FlushBiggestDMS());
NO_FATALS(VerifyData());
// Major compact some columns.
vector<ColumnId> col_ids;
for (int col_index = 0; col_index < col_ids_to_compact.size() - i; col_index++) {
col_ids.push_back(col_ids_to_compact[col_index]);
}
ASSERT_OK(tablet()->DoMajorDeltaCompaction(col_ids, rs));
NO_FATALS(VerifyData());
}
}
// Verify that we do issue UNDO files and that we can read them.
TEST_F(TestMajorDeltaCompaction, TestUndos) {
const int kNumRows = 100;
NO_FATALS(WriteTestTablet(kNumRows));
ASSERT_OK(tablet()->Flush());
vector<shared_ptr<RowSet> > all_rowsets;
tablet()->GetRowSetsForTests(&all_rowsets);
shared_ptr<RowSet> rs = all_rowsets.front();
MvccSnapshot snap(*tablet()->mvcc_manager());
// Verify the old data and grab a copy of the old state.
NO_FATALS(VerifyDataWithMvccAndExpectedState(snap, expected_state_));
vector<ExpectedRow> old_state(expected_state_.size());
std::copy(expected_state_.begin(), expected_state_.end(), old_state.begin());
// Flush the DMS, make sure we still see the old data.
NO_FATALS(UpdateRows(kNumRows, false));
ASSERT_OK(tablet()->FlushBiggestDMS());
NO_FATALS(VerifyDataWithMvccAndExpectedState(snap, old_state));
// Major compact, check we still have the old data.
vector<ColumnId> col_ids_to_compact = { schema_.column_id(1),
schema_.column_id(3),
schema_.column_id(4) };
ASSERT_OK(tablet()->DoMajorDeltaCompaction(col_ids_to_compact, rs));
NO_FATALS(VerifyDataWithMvccAndExpectedState(snap, old_state));
// Test adding three updates per row to three REDO files.
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
NO_FATALS(UpdateRows(kNumRows, false));
}
ASSERT_OK(tablet()->FlushBiggestDMS());
}
// To complicate things further, only major compact two columns, then verify we can read the old
// and the new data.
col_ids_to_compact.pop_back();
ASSERT_OK(tablet()->DoMajorDeltaCompaction(col_ids_to_compact, rs));
NO_FATALS(VerifyDataWithMvccAndExpectedState(snap, old_state));
NO_FATALS(VerifyData());
}
// Test that the delete REDO mutations are written back and not filtered out.
TEST_F(TestMajorDeltaCompaction, TestCarryDeletesOver) {
const int kNumRows = 100;
NO_FATALS(WriteTestTablet(kNumRows));
ASSERT_OK(tablet()->Flush());
vector<shared_ptr<RowSet> > all_rowsets;
tablet()->GetRowSetsForTests(&all_rowsets);
shared_ptr<RowSet> rs = all_rowsets.front();
NO_FATALS(UpdateRows(kNumRows, false));
ASSERT_OK(tablet()->FlushBiggestDMS());
MvccSnapshot updates_snap(*tablet()->mvcc_manager());
vector<ExpectedRow> old_state(expected_state_.size());
std::copy(expected_state_.begin(), expected_state_.end(), old_state.begin());
NO_FATALS(DeleteRows(kNumRows));
ASSERT_OK(tablet()->FlushBiggestDMS());
vector<ColumnId> col_ids_to_compact = { schema_.column_id(4) };
ASSERT_OK(tablet()->DoMajorDeltaCompaction(col_ids_to_compact, rs));
NO_FATALS(VerifyData());
NO_FATALS(VerifyDataWithMvccAndExpectedState(updates_snap, old_state));
}
// Verify that reinserts only happen in the MRS and not down into the DRS. This test serves as a
// way to document how things work, and if they change then we'll know that our assumptions have
// changed.
TEST_F(TestMajorDeltaCompaction, TestReinserts) {
const int kNumRows = 100;
// Reinsert all the rows directly in the MRS.
NO_FATALS(WriteTestTablet(kNumRows)); // 1st batch.
NO_FATALS(DeleteRows(kNumRows)); // Delete 1st batch.
NO_FATALS(WriteTestTablet(kNumRows)); // 2nd batch.
ASSERT_OK(tablet()->Flush());
// Update those rows, we'll try to read them at the end.
NO_FATALS(UpdateRows(kNumRows, false)); // Update 2nd batch.
vector<ExpectedRow> old_state(expected_state_.size());
std::copy(expected_state_.begin(), expected_state_.end(), old_state.begin());
MvccSnapshot second_batch_inserts(*tablet()->mvcc_manager());
vector<shared_ptr<RowSet> > all_rowsets;
tablet()->GetRowSetsForTests(&all_rowsets);
ASSERT_EQ(1, all_rowsets.size());
NO_FATALS(VerifyData());
// Delete the rows (will go into the DMS) then reinsert them (will go in a new MRS), then flush
// the DMS with the deletes so that we can major compact them.
NO_FATALS(DeleteRows(kNumRows)); // Delete 2nd batch.
NO_FATALS(WriteTestTablet(kNumRows)); // 3rd batch.
ASSERT_OK(tablet()->FlushBiggestDMS());
// At this point, here's the layout (the 1st batch was discarded during the first flush):
// MRS: 3rd batch of inserts.
// RS1: UNDO DF: Deletes for the 2nd batch.
// DS: Base data for the 2nd batch.
// REDO DF: Updates and deletes for the 2nd.
// Now we'll push some of the updates down.
shared_ptr<RowSet> rs = all_rowsets.front();
vector<ColumnId> col_ids_to_compact = { schema_.column_id(4) };
ASSERT_OK(tablet()->DoMajorDeltaCompaction(col_ids_to_compact, rs));
// The data we'll see here is the 3rd batch of inserts, doesn't have updates.
NO_FATALS(VerifyData());
// Test that the 3rd batch of inserts goes into a new RS, even though it's the same row keys.
ASSERT_OK(tablet()->Flush());
all_rowsets.clear();
tablet()->GetRowSetsForTests(&all_rowsets);
ASSERT_EQ(2, all_rowsets.size());
// Verify the 3rd batch.
NO_FATALS(VerifyData());
// Verify the updates in the second batch are still readable, from the first RS.
NO_FATALS(VerifyDataWithMvccAndExpectedState(second_batch_inserts, old_state));
}
// Verify that we won't schedule a major compaction when files are just composed of deletes.
TEST_F(TestMajorDeltaCompaction, TestJustDeletes) {
const int kNumRows = 100;
NO_FATALS(WriteTestTablet(kNumRows));
ASSERT_OK(tablet()->Flush());
NO_FATALS(DeleteRows(kNumRows));
ASSERT_OK(tablet()->FlushBiggestDMS());
shared_ptr<RowSet> rs;
ASSERT_EQ(0,
tablet()->GetPerfImprovementForBestDeltaCompact(RowSet::MAJOR_DELTA_COMPACTION, &rs));
}
} // namespace tablet
} // namespace kudu