blob: ec1e78a30c9595f105049d9682c2f4856816ec43 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <map>
#include <memory>
#include <utility>
#include "kudu/client/client-test-util.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/mini_cluster-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/util/random.h"
using kudu::client::KuduScanner;
using kudu::client::KuduTable;
using kudu::client::sp::shared_ptr;
using kudu::server::Clock;
using kudu::tablet::Tablet;
using kudu::tablet::TabletPeer;
using kudu::tserver::MiniTabletServer;
using kudu::tserver::TabletServer;
using kudu::tserver::TSTabletManager;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
DECLARE_bool(use_mock_wall_clock);
DECLARE_int32(scanner_ttl_ms);
DECLARE_int32(tablet_history_max_age_sec);
DECLARE_string(block_manager);
DECLARE_bool(enable_maintenance_manager);
DEFINE_int32(test_num_rounds, 200, "Number of rounds to loop "
"RandomizedTabletHistoryGcITest.TestRandomHistoryGCWorkload");
using kudu::server::HybridClock;
namespace kudu {
class TabletHistoryGcITest : public MiniClusterITestBase {
};
// Check that attempts to scan prior to the ancient history mark fail.
TEST_F(TabletHistoryGcITest, TestSnapshotScanBeforeAHM) {
FLAGS_tablet_history_max_age_sec = 0;
NO_FATALS(StartCluster());
// Create a tablet so we can scan it.
TestWorkload workload(cluster_.get());
workload.Setup();
// When the tablet history max age is set to 0, it's not possible to do a
// snapshot scan without a timestamp because it's illegal to open a snapshot
// prior to the AHM. When a snapshot timestamp is not specified, we decide on
// the timestamp of the snapshot before checking that it's lower than the
// current AHM. This test verifies that scans prior to the AHM are rejected.
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table));
KuduScanner scanner(table.get());
ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
Status s = scanner.Open();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Snapshot timestamp is earlier than the ancient history mark");
}
// Whether a MaterializedTestRow is deleted or not.
enum IsDeleted {
NOT_DELETED,
DELETED
};
// Test row. Schema follows SimpleTestSchema.
struct MaterializedTestRow {
int32_t key;
int32_t int_val;
string string_val;
IsDeleted is_deleted;
};
// Randomized test that performs a repeatable sequence of events on a tablet.
class RandomizedTabletHistoryGcITest : public TabletHistoryGcITest {
public:
RandomizedTabletHistoryGcITest() {
// We need to fully control compactions, flushes, and the clock.
FLAGS_enable_maintenance_manager = false;
FLAGS_use_mock_wall_clock = true;
FLAGS_tablet_history_max_age_sec = 100;
}
protected:
enum Actions {
kInsert,
kUpdate,
kDelete,
kFlush,
kMergeCompaction,
kRedoDeltaCompaction,
kMoveTimeForward,
kStartScan,
kNumActions, // Count of items in this enum. Keep as last entry.
};
// Provide efficient sorted access to a snapshot of a table indexed by key.
using MaterializedTestTable = std::map<int32_t, MaterializedTestRow>;
// Timestamp value (from Timestamp::ToUint64()) -> Table snapshot.
using MaterializedTestSnapshots = std::map<uint64_t, MaterializedTestTable>;
using ScannerTSPair = std::pair<unique_ptr<client::KuduScanner>, Timestamp>;
// verify_round -> { scanner, snapshot_timestamp }.
using ScannerMap = std::multimap<int, ScannerTSPair>;
string StringifyTestRow(const MaterializedTestRow& row) {
return Substitute("{ $0, $1, $2, $3 }", row.key, row.int_val, row.string_val,
(row.is_deleted == DELETED) ? "DELETED" : "NOT_DELETED");
}
string StringifyTimestamp(const Timestamp& ts) {
return Substitute("$0 ($1)", HybridClock::StringifyTimestamp(ts), ts.ToString());
}
MaterializedTestTable CloneLatestSnapshot() {
return snapshots_[latest_snapshot_ts_.ToUint64()]; // Will auto-vivify on first pass.
}
MaterializedTestTable* GetPtrToLatestSnapshot() {
return &snapshots_[latest_snapshot_ts_.ToUint64()];
}
MaterializedTestTable* GetPtrToSnapshotForTS(Timestamp ts) {
MaterializedTestTable* table = FindFloorOrNull(snapshots_, ts.ToUint64());
if (!table) {
LOG(FATAL) << "There is no saved snapshot with a TS <= " << ts.ToUint64();
}
return table;
}
void SaveSnapshot(MaterializedTestTable snapshot, const Timestamp& ts) {
VLOG(2) << "Saving snapshot at ts = " << StringifyTimestamp(ts);
snapshots_[ts.ToUint64()] = std::move(snapshot);
latest_snapshot_ts_ = ts;
}
void AddTimeToClock(MonoDelta delta) {
uint64_t now = HybridClock::GetPhysicalValueMicros(clock_->Now());
uint64_t new_time = now + delta.ToMicroseconds();
clock_->SetMockClockWallTimeForTests(new_time);
}
void RegisterScanner(unique_ptr<client::KuduScanner> scanner, Timestamp snap_ts,
int verify_round) {
CHECK_GE(verify_round, cur_round_);
if (verify_round == cur_round_) {
NO_FATALS(VerifySnapshotScan(std::move(scanner), std::move(snap_ts), verify_round));
return;
}
ScannerTSPair pair(std::move(scanner), std::move(snap_ts));
ScannerMap::value_type entry(verify_round, std::move(pair));
scanners_.insert(std::move(entry));
}
void VerifyAndRemoveScanners(ScannerMap::iterator begin, ScannerMap::iterator end) {
auto iter = begin;
while (iter != end) {
int verify_round = iter->first;
unique_ptr<KuduScanner>& scanner = iter->second.first;
Timestamp snap_ts = iter->second.second;
NO_FATALS(VerifySnapshotScan(std::move(scanner), snap_ts, verify_round));
auto old_iter = iter;
++iter;
scanners_.erase(old_iter);
}
}
void VerifyScannersForRound(int round) {
auto iters = scanners_.equal_range(round);
NO_FATALS(VerifyAndRemoveScanners(iters.first, iters.second));
}
void VerifyAllRemainingScanners() {
VLOG(1) << "Verifying all remaining scanners";
NO_FATALS(VerifyAndRemoveScanners(scanners_.begin(), scanners_.end()));
}
void VerifySnapshotScan(unique_ptr<client::KuduScanner> scanner, Timestamp snap_ts, int round) {
LOG(INFO) << "Round " << round << ": Verifying snapshot scan for timestamp "
<< StringifyTimestamp(snap_ts);
MaterializedTestTable* snap = GetPtrToSnapshotForTS(snap_ts);
ASSERT_NE(snap, nullptr) << "Could not find snapshot to match timestamp " << snap_ts.ToString();
int32_t rows_seen = 0;
auto snap_iter = snap->cbegin();
// Maintain a summary of mismatched keys for use in debugging.
vector<int32_t> mismatched_keys;
while (scanner->HasMoreRows()) {
client::KuduScanBatch batch;
ASSERT_OK_FAST(scanner->NextBatch(&batch));
auto scan_iter = batch.begin();
while (scan_iter != batch.end()) {
// Deleted rows will show up in our verification snapshot, but not the
// tablet scanner.
if (snap_iter->second.is_deleted == DELETED) {
VLOG(4) << "Row " << snap_iter->second.key << " is DELETED in our historical snapshot";
++snap_iter;
continue;
}
int32_t key;
int32_t int_val;
Slice string_val;
ASSERT_OK_FAST((*scan_iter).GetInt32(0, &key));
ASSERT_OK_FAST((*scan_iter).GetInt32(1, &int_val));
ASSERT_OK_FAST((*scan_iter).GetString(2, &string_val));
// We attempt to compare both snapshots fully, even in the case of
// failure, to help us understand what's going on when problems occur.
EXPECT_EQ(snap_iter->second.key, key) << "Mismatch at result row number " << rows_seen;
EXPECT_EQ(snap_iter->second.int_val, int_val) << "at row key " << key;
EXPECT_EQ(snap_iter->second.string_val, string_val.ToString()) << "at row key " << key;
++rows_seen;
if (key == snap_iter->second.key) {
// Move both (the normal case)
++scan_iter;
++snap_iter;
} else if (key < snap_iter->second.key) {
mismatched_keys.push_back(key);
++scan_iter; // Only move scan to try to catch up.
} else {
mismatched_keys.push_back(snap_iter->second.key);
++snap_iter; // Only move snap to try to catch up.
}
}
}
// Account for trailing deleted rows in our verification snapshot.
while (snap_iter != snap->cend()) {
EXPECT_EQ(DELETED, snap_iter->second.is_deleted)
<< "Expected row " << snap_iter->second.key << " to be DELETED";
if (!snap_iter->second.is_deleted) {
mismatched_keys.push_back(snap_iter->second.key);
}
++snap_iter;
}
ASSERT_EQ(snap->cend(), snap_iter);
scanner->Close();
if (::testing::Test::HasFailure()) {
for (int32_t key : mismatched_keys) {
LOG(ERROR) << "Mismatched key: " << key;
}
FAIL() << "Snapshot verification failed";
}
LOG(INFO) << "Snapshot verification complete. Rows seen: " << rows_seen;
}
MaterializedTestSnapshots snapshots_;
ScannerMap scanners_;
HybridClock* clock_ = nullptr;
Timestamp latest_snapshot_ts_;
int32_t rows_inserted_ = 0;
int cur_round_ = 0;
};
// Hooks to force a reupdate of missing deltas when a flush or merge compaction
// occurs.
class ReupdateHooks : public Tablet::FlushCompactCommonHooks {
public:
ReupdateHooks(Tablet* tablet, const Schema& schema)
: tablet_(tablet),
client_schema_(schema) {
}
Status PostWriteSnapshot() OVERRIDE {
tablet::LocalTabletWriter writer(tablet_, &client_schema_);
for (const MaterializedTestRow& update : updates_) {
KuduPartialRow row(&client_schema_);
CHECK_OK(row.SetInt32(0, update.key));
CHECK_OK(row.SetInt32(1, update.int_val));
CHECK_OK(row.SetStringCopy(2, update.string_val));
CHECK_OK(writer.Update(row));
}
for (int32_t row_key : deletes_) {
KuduPartialRow row(&client_schema_);
CHECK_OK(row.SetInt32(0, row_key));
CHECK_OK(writer.Delete(row));
}
return Status::OK();
}
void set_updates(vector<MaterializedTestRow> updates) {
updates_ = std::move(updates);
}
void set_deletes(vector<int32_t> deletes) {
deletes_ = std::move(deletes);
}
void Reset() {
updates_.clear();
deletes_.clear();
}
private:
enum Action {
kUpdate,
kDelete
};
Tablet* const tablet_;
const Schema client_schema_;
vector<MaterializedTestRow> updates_;
vector<int32_t> deletes_;
};
// Randomized test that attempts to test many arbitrary history GC use cases.
TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
OverrideFlagForSlowTests("test_num_rounds",
Substitute("$0", FLAGS_test_num_rounds * 5));
LOG(INFO) << "Running " << FLAGS_test_num_rounds << " rounds";
// Set high scanner TTL, since this test opens scanners and then waits for some
// time before reading from them.
FLAGS_scanner_ttl_ms = 1000 * 60 * 60 * 24;
StartCluster(1); // Start MiniCluster with a single tablet server.
TestWorkload workload(cluster_.get());
workload.set_num_replicas(1);
workload.Setup(); // Convenient way to create a table.
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(workload.table_name(), &table));
// Directly access the Tablet so we can control compaction and the clock.
MiniTabletServer* mts = cluster_->mini_tablet_server(0);
TabletServer* ts = mts->server();
clock_ = down_cast<HybridClock*>(ts->clock());
std::vector<scoped_refptr<TabletPeer>> peers;
ts->tablet_manager()->GetTabletPeers(&peers);
ASSERT_EQ(1, peers.size());
Tablet* tablet = peers[0]->tablet();
// Set initial clock time to 1000 seconds past 0, which is enough so that the
// AHM will not be negative.
const uint64_t kInitialMicroTime = 1L * 1000 * 1000 * 1000;
clock_->SetMockClockWallTimeForTests(kInitialMicroTime);
LOG(INFO) << "Seeding random number generator";
Random random(SeedRandom());
// Save an empty snapshot at the "beginning of time";
NO_FATALS(SaveSnapshot(MaterializedTestTable(), Timestamp(0)));
for (cur_round_ = 0; cur_round_ < FLAGS_test_num_rounds; cur_round_++) {
VLOG(1) << "Starting round " << cur_round_;
NO_FATALS(VerifyScannersForRound(cur_round_));
int action = random.Uniform(kNumActions);
switch (action) {
case kInsert: {
// TODO: Allow for reinsert onto deleted rows after KUDU-237 has been
// implemented.
int32_t num_rows_to_insert = random.Uniform(1000);
VLOG(1) << "Inserting " << num_rows_to_insert << " rows";
if (num_rows_to_insert == 0) continue;
MaterializedTestTable snapshot = CloneLatestSnapshot();
client::sp::shared_ptr<client::KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(20000);
ASSERT_OK_FAST(session->SetFlushMode(
client::KuduSession::AUTO_FLUSH_BACKGROUND));
for (int32_t i = 0; i < num_rows_to_insert; i++) {
int32_t row_key = rows_inserted_;
MaterializedTestRow test_row = { row_key,
static_cast<int32_t>(random.Next()),
Substitute("$0", random.Next()),
NOT_DELETED };
unique_ptr<client::KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
ASSERT_OK_FAST(row->SetInt32(0, test_row.key));
ASSERT_OK_FAST(row->SetInt32(1, test_row.int_val));
ASSERT_OK_FAST(row->SetStringCopy(2, test_row.string_val));
ASSERT_OK_FAST(session->Apply(insert.release()));
VLOG(2) << "Inserting row " << StringifyTestRow(test_row);
snapshot[row_key] = std::move(test_row);
rows_inserted_++;
}
FlushSessionOrDie(session);
SaveSnapshot(std::move(snapshot), clock_->Now());
break;
}
case kUpdate: {
if (rows_inserted_ == 0) continue;
int32_t num_rows_to_update = random.Uniform(std::min(rows_inserted_, 1000));
VLOG(1) << "Updating up to " << num_rows_to_update << " rows";
if (num_rows_to_update == 0) continue;
MaterializedTestTable snapshot = CloneLatestSnapshot();
// 5% chance to reupdate while also forcing a full compaction.
bool force_reupdate_missed_deltas = random.OneIn(20);
if (force_reupdate_missed_deltas) {
VLOG(1) << "Forcing a reupdate of missed deltas";
}
vector<MaterializedTestRow> updates;
for (int i = 0; i < num_rows_to_update; i++) {
int32_t row_key = random.Uniform(rows_inserted_);
MaterializedTestRow test_row = snapshot[row_key];
CHECK_EQ(row_key, test_row.key) << "Rows inserted: " << rows_inserted_
<< ", row: " << StringifyTestRow(test_row);
if (test_row.is_deleted == DELETED) continue;
test_row.int_val = random.Next();
test_row.string_val = Substitute("$0", random.Next());
VLOG(2) << "Updating row to " << StringifyTestRow(test_row);
updates.push_back(test_row);
snapshot[row_key] = std::move(test_row);
}
int rows_updated = updates.size();
if (rows_updated == 0) continue;
if (force_reupdate_missed_deltas) {
std::shared_ptr<ReupdateHooks> hooks =
std::make_shared<ReupdateHooks>(tablet, GetSimpleTestSchema());
hooks->set_updates(std::move(updates));
tablet->SetFlushCompactCommonHooksForTests(hooks);
ASSERT_OK(tablet->Compact(Tablet::FORCE_COMPACT_ALL));
tablet->SetFlushCompactCommonHooksForTests(nullptr); // Clear the hook.
} else {
client::sp::shared_ptr<client::KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(20000);
ASSERT_OK_FAST(session->SetFlushMode(
client::KuduSession::AUTO_FLUSH_BACKGROUND));
for (const MaterializedTestRow& test_row : updates) {
unique_ptr<client::KuduUpdate> update(table->NewUpdate());
KuduPartialRow* row = update->mutable_row();
ASSERT_OK_FAST(row->SetInt32(0, test_row.key));
ASSERT_OK_FAST(row->SetInt32(1, test_row.int_val));
ASSERT_OK_FAST(row->SetStringCopy(2, test_row.string_val));
ASSERT_OK_FAST(session->Apply(update.release()));
}
FlushSessionOrDie(session);
}
SaveSnapshot(std::move(snapshot), clock_->Now());
VLOG(1) << "Updated " << rows_updated << " rows";
break;
}
case kDelete: {
if (rows_inserted_ == 0) continue;
int32_t num_rows_to_delete = random.Uniform(std::min(rows_inserted_, 1000));
VLOG(1) << "Deleting up to " << num_rows_to_delete << " rows";
if (num_rows_to_delete == 0) continue;
MaterializedTestTable snapshot = CloneLatestSnapshot();
// 5% chance to reupdate while also forcing a full compaction.
bool force_reupdate_missed_deltas = random.OneIn(20);
if (force_reupdate_missed_deltas) {
VLOG(1) << "Forcing a reupdate of missed deltas";
}
vector<int32_t> deletes;
for (int i = 0; i < num_rows_to_delete; i++) {
int32_t row_key = random.Uniform(rows_inserted_);
MaterializedTestRow test_row = snapshot[row_key];
CHECK_EQ(row_key, test_row.key);
if (test_row.is_deleted == DELETED) {
VLOG(2) << "Row " << test_row.key << " is already deleted";
continue;
}
VLOG(2) << "Deleting row " << row_key;
deletes.push_back(row_key);
test_row.is_deleted = DELETED;
snapshot[row_key] = std::move(test_row);
}
int rows_deleted = deletes.size();
if (rows_deleted == 0) continue;
if (force_reupdate_missed_deltas) {
std::shared_ptr<ReupdateHooks> hooks =
std::make_shared<ReupdateHooks>(tablet, GetSimpleTestSchema());
hooks->set_deletes(std::move(deletes));
tablet->SetFlushCompactCommonHooksForTests(hooks);
ASSERT_OK(tablet->Compact(Tablet::FORCE_COMPACT_ALL));
tablet->SetFlushCompactCommonHooksForTests(nullptr); // Clear the hook.
} else {
client::sp::shared_ptr<client::KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(20000);
ASSERT_OK_FAST(session->SetFlushMode(
client::KuduSession::AUTO_FLUSH_BACKGROUND));
for (int32_t row_key : deletes) {
unique_ptr<client::KuduDelete> del(table->NewDelete());
KuduPartialRow* row = del->mutable_row();
ASSERT_OK_FAST(row->SetInt32(0, row_key));
ASSERT_OK_FAST(session->Apply(del.release()));
}
FlushSessionOrDie(session);
}
SaveSnapshot(std::move(snapshot), clock_->Now());
VLOG(1) << "Deleted " << rows_deleted << " rows";
break;
}
case kFlush: {
if (random.OneIn(2)) {
VLOG(1) << "Flushing tablet";
ASSERT_OK(tablet->Flush());
} else {
VLOG(1) << "Flushing biggest DMS";
ASSERT_OK(tablet->FlushBiggestDMS());
}
break;
}
case kMergeCompaction: {
// TODO: Randomize which rowsets get merged.
VLOG(1) << "Running merge compaction";
ASSERT_OK(tablet->Compact(Tablet::COMPACT_NO_FLAGS));
break;
}
case kRedoDeltaCompaction: {
// TODO: Randomize which deltas / projections get compacted.
bool major = random.OneIn(2);
VLOG(1) << "Running " << (major ? "major" : "minor") << " delta compaction";
ASSERT_OK(tablet->CompactWorstDeltas(major ? tablet::RowSet::MAJOR_DELTA_COMPACTION :
tablet::RowSet::MINOR_DELTA_COMPACTION));
break;
}
case kMoveTimeForward: {
VLOG(1) << "Moving clock forward";
AddTimeToClock(MonoDelta::FromSeconds(200));
break;
}
case kStartScan: {
int read_delay_rounds = random.Uniform(100);
int read_round = cur_round_ + read_delay_rounds;
bool time_travel = random.OneIn(2);
int seconds_in_past = 0;
if (time_travel) {
seconds_in_past = random.Uniform(FLAGS_tablet_history_max_age_sec);
}
Timestamp snapshot_ts =
HybridClock::AddPhysicalTimeToTimestamp(clock_->Now(),
MonoDelta::FromSeconds(-1L * seconds_in_past));
VLOG(1) << "Round " << cur_round_ << ": Starting snapshot scan for " << seconds_in_past
<< " seconds in the past at " << StringifyTimestamp(snapshot_ts)
<< ") and scheduling the read for round " << read_round;
unique_ptr<client::KuduScanner> scanner(new KuduScanner(table.get()));
ASSERT_OK(scanner->SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
ASSERT_OK(scanner->SetOrderMode(KuduScanner::ORDERED));
ASSERT_OK(scanner->SetSnapshotRaw(snapshot_ts.ToUint64()));
ASSERT_OK(scanner->Open());
NO_FATALS(RegisterScanner(std::move(scanner), std::move(snapshot_ts), read_round));
break;
}
default: {
LOG(FATAL) << "Unexpected value: " << action;
}
}
}
NO_FATALS(VerifyAllRemainingScanners());
}
} // namespace kudu