blob: 468ff6cecfca40bc5b0e314c289c45ff50eee7b2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp> // IWYU pragma: keep
#include <boost/optional/optional_io.hpp> // IWYU pragma: keep
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/schema.h"
#include "kudu/client/session-internal.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/clock/clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/schema.h"
#include "kudu/common/txn_id.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/key_value_test_schema.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_participant-test-util.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DEFINE_int32(keyspace_size, 5, "number of distinct primary keys to test with");
DEFINE_int32(max_open_txns, 5, "maximum number of open transactions to test with");
DECLARE_bool(enable_maintenance_manager);
DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
DECLARE_bool(use_hybrid_clock);
using boost::optional;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduDelete;
using kudu::client::KuduPredicate;
using kudu::client::KuduScanBatch;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
using kudu::client::KuduUpdate;
using kudu::client::KuduValue;
using kudu::client::KuduWriteOperation;
using kudu::client::sp::shared_ptr;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
using kudu::tserver::ParticipantOpPB;
using kudu::tserver::ParticipantResponsePB;
using std::list;
using std::map;
using std::make_pair;
using std::pair;
using std::string;
using std::vector;
using std::unique_ptr;
using std::unordered_map;
using std::unordered_set;
using strings::Substitute;
namespace kudu {
namespace tablet {
// The type of operation in a sequence of operations generated by
// the fuzz test.
enum TestOpType {
TEST_INSERT,
TEST_INSERT_PK_ONLY,
TEST_INSERT_IGNORE,
TEST_INSERT_IGNORE_PK_ONLY,
TEST_UPSERT,
TEST_UPSERT_PK_ONLY,
TEST_UPDATE,
TEST_UPDATE_IGNORE,
TEST_DELETE,
TEST_DELETE_IGNORE,
TEST_FLUSH_OPS,
TEST_FLUSH_TABLET,
TEST_FLUSH_DELTAS,
TEST_MINOR_COMPACT_DELTAS,
TEST_MAJOR_COMPACT_DELTAS,
TEST_COMPACT_TABLET,
TEST_RESTART_TS,
TEST_SCAN_AT_TIMESTAMP,
TEST_DIFF_SCAN,
TEST_BEGIN_TXN,
TEST_COMMIT_TXN,
TEST_ABORT_TXN,
TEST_NUM_OP_TYPES // max value for enum
};
const char* kTableName = "table";
const char* TestOpType_names[] = {
"TEST_INSERT",
"TEST_INSERT_PK_ONLY",
"TEST_INSERT_IGNORE",
"TEST_INSERT_IGNORE_PK_ONLY",
"TEST_UPSERT",
"TEST_UPSERT_PK_ONLY",
"TEST_UPDATE",
"TEST_UPDATE_IGNORE",
"TEST_DELETE",
"TEST_DELETE_IGNORE",
"TEST_FLUSH_OPS",
"TEST_FLUSH_TABLET",
"TEST_FLUSH_DELTAS",
"TEST_MINOR_COMPACT_DELTAS",
"TEST_MAJOR_COMPACT_DELTAS",
"TEST_COMPACT_TABLET",
"TEST_RESTART_TS",
"TEST_SCAN_AT_TIMESTAMP",
"TEST_DIFF_SCAN",
"TEST_BEGIN_TXN",
"TEST_COMMIT_TXN",
"TEST_ABORT_TXN",
};
constexpr const int kNoTxnId = -1;
// Identical to kNoTxnId but more generic-sounding for ops that don't use
// transaction IDs.
constexpr const int kNoVal = -1;
// An operation in a fuzz-test sequence.
struct TestOp {
// NOLINTNEXTLINE(google-explicit-constructor)
TestOp(TestOpType t, int v1 = kNoVal, int v2 = kNoVal) // NOLINT(runtime/explicit)
: type(t),
val(v1),
val2(v2) {}
// The op to run.
TestOpType type;
// For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified.
// For SCAN_AT_TIMESTAMP the timestamp of the scan.
// For DIFF_SCAN the start timestamp of the scan.
// For BEGIN_TXN/COMMIT_TXN/ABORT_TXN, the txn ID to operate on.
// For FLUSH_OPS, the txn ID to operate on.
// Otherwise, unused.
int val;
// For INSERT, the transaction ID to insert with (kNoTxnId means none).
// For DIFF_SCAN, the end timestamp of the scan.
// Otherwise, unused.
int val2;
string ToString() const {
switch (type) {
case TEST_FLUSH_TABLET:
case TEST_COMPACT_TABLET:
case TEST_FLUSH_DELTAS:
case TEST_MAJOR_COMPACT_DELTAS:
case TEST_MINOR_COMPACT_DELTAS:
case TEST_RESTART_TS:
return strings::Substitute("{$0}", TestOpType_names[type]);
case TEST_FLUSH_OPS:
case TEST_UPSERT:
case TEST_UPSERT_PK_ONLY:
case TEST_UPDATE:
case TEST_UPDATE_IGNORE:
case TEST_DELETE:
case TEST_DELETE_IGNORE:
case TEST_SCAN_AT_TIMESTAMP:
case TEST_BEGIN_TXN:
case TEST_COMMIT_TXN:
case TEST_ABORT_TXN:
return strings::Substitute("{$0, $1}", TestOpType_names[type], val);
case TEST_INSERT:
case TEST_INSERT_PK_ONLY:
case TEST_INSERT_IGNORE:
case TEST_INSERT_IGNORE_PK_ONLY:
case TEST_DIFF_SCAN:
return strings::Substitute("{$0, $1, $2}", TestOpType_names[type], val, val2);
default:
LOG(FATAL) << "Invalid op type: " << type;
}
__builtin_unreachable();
}
};
enum RedoType {
INSERT,
UPDATE,
DELETE,
};
struct Redo {
Redo(RedoType t, int32_t k, optional<int32_t> v = boost::none)
: rtype(t),
key(k),
val(std::move(v)) {}
string ToString() const {
if (rtype == DELETE) {
return strings::Substitute("{DELETE key=$0}", key);
}
return strings::Substitute("{$0 key=$1 val=$2}",
rtype == INSERT ? "INSERT" : "UPDATE", key,
val ? std::to_string(*val) : "NULL");
}
RedoType rtype;
int32_t key;
optional<int32_t> val;
};
// TODO(awong): Merging multiple transactional MRSs together can sometimes lead
// to a crash. Uncomment the transactional ops once fixed.
const vector<TestOpType> kAllOps {TEST_INSERT,
TEST_INSERT_PK_ONLY,
TEST_INSERT_IGNORE,
TEST_INSERT_IGNORE_PK_ONLY,
TEST_UPSERT,
TEST_UPSERT_PK_ONLY,
TEST_UPDATE,
TEST_UPDATE_IGNORE,
TEST_DELETE,
TEST_DELETE_IGNORE,
TEST_FLUSH_OPS,
TEST_FLUSH_TABLET,
TEST_FLUSH_DELTAS,
TEST_MINOR_COMPACT_DELTAS,
TEST_MAJOR_COMPACT_DELTAS,
TEST_COMPACT_TABLET,
TEST_RESTART_TS,
TEST_SCAN_AT_TIMESTAMP,
TEST_DIFF_SCAN};
// TEST_BEGIN_TXN,
// TEST_COMMIT_TXN,
// TEST_ABORT_TXN};
// Ops that focus on hammering workloads in which rows come in and out of
// existence.
// TODO(awong): Merging multiple transactional MRSs together can sometimes lead
// to a crash. Uncomment the transactional ops once fixed.
const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
TEST_INSERT_IGNORE_PK_ONLY,
TEST_UPSERT_PK_ONLY,
TEST_DELETE,
TEST_DELETE_IGNORE,
TEST_FLUSH_OPS,
TEST_FLUSH_TABLET,
TEST_FLUSH_DELTAS,
TEST_MINOR_COMPACT_DELTAS,
TEST_MAJOR_COMPACT_DELTAS,
TEST_COMPACT_TABLET,
TEST_RESTART_TS,
TEST_SCAN_AT_TIMESTAMP,
TEST_DIFF_SCAN};
// TEST_BEGIN_TXN,
// TEST_COMMIT_TXN,
// TEST_ABORT_TXN};
// Test which does only random operations against a tablet, including update and random
// get (ie scans with equal lower and upper bounds).
//
// The test maintains an in-memory copy of the expected state of the tablet, and uses only
// a single thread, so that it's easy to verify that the tablet always matches the expected
// state.
class FuzzTest : public KuduTest {
public:
FuzzTest() {
FLAGS_enable_maintenance_manager = false;
FLAGS_use_hybrid_clock = false;
FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps = true;
}
void CreateTabletAndStartClusterWithSchema(const Schema& schema) {
schema_ = KuduSchema::FromSchema(schema);
KuduTest::SetUp();
InternalMiniClusterOptions opts;
cluster_.reset(new InternalMiniCluster(env_, opts));
ASSERT_OK(cluster_->Start());
CHECK_OK(KuduClientBuilder()
.add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str())
.default_admin_operation_timeout(MonoDelta::FromSeconds(60))
.Build(&client_));
// Add a table, make sure it reports itself.
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
CHECK_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.set_range_partition_columns({ "key" })
.num_replicas(1)
.Create());
// Find the replica.
tablet_replica_ = LookupTabletReplica();
// Setup session and table.
session_ = client_->NewSession();
CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH));
session_->SetTimeoutMillis(60 * 1000);
CHECK_OK(client_->OpenTable(kTableName, &table_));
}
void TearDown() override {
if (tablet_replica_) tablet_replica_.reset();
if (cluster_) cluster_->Shutdown();
}
scoped_refptr<TabletReplica> LookupTabletReplica() {
vector<scoped_refptr<TabletReplica> > replicas;
cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplicas(&replicas);
CHECK_EQ(1, replicas.size());
return replicas[0];
}
void RestartTabletServer() {
tablet_replica_.reset();
auto ts = cluster_->mini_tablet_server(0);
if (ts->server()) {
ts->Shutdown();
ASSERT_OK(ts->Restart());
} else {
ASSERT_OK(ts->Start());
}
ASSERT_OK(ts->server()->WaitInited());
tablet_replica_ = LookupTabletReplica();
}
Tablet* tablet() const {
return tablet_replica_->tablet();
}
Status CallParticipantOpCheckResp(int64_t txn_id, ParticipantOpPB::ParticipantOpType op_type,
int64_t ts_val) {
RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(
MonoDelta::FromSeconds(10)));
ParticipantResponsePB resp;
RETURN_NOT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op_type, ts_val, &resp));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
// Adds an insert for the given key/value pair to 'ops', returning the new contents
// of the row.
ExpectedKeyValueRow InsertOrUpsertRow(int key, int val,
optional<ExpectedKeyValueRow> old_row,
TestOpType type, int txn_id) {
ExpectedKeyValueRow ret;
unique_ptr<KuduWriteOperation> op;
if (type == TEST_INSERT || type == TEST_INSERT_PK_ONLY) {
op.reset(table_->NewInsert());
} else if (type == TEST_INSERT_IGNORE || type == TEST_INSERT_IGNORE_PK_ONLY) {
op.reset(table_->NewInsertIgnore());
} else {
op.reset(table_->NewUpsert());
}
KuduPartialRow* row = op->mutable_row();
CHECK_OK(row->SetInt32(0, key));
ret.key = key;
switch (type) {
case TEST_INSERT:
case TEST_INSERT_IGNORE:
case TEST_UPSERT: {
if (val & 1) {
CHECK_OK(row->SetNull(1));
} else {
CHECK_OK(row->SetInt32(1, val));
ret.val = val;
}
if (type == TEST_INSERT_IGNORE && old_row) {
// insert ignore when the row already exists results in old value
ret.val = old_row->val;
}
break;
}
case TEST_INSERT_PK_ONLY:
break;
case TEST_INSERT_IGNORE_PK_ONLY:
case TEST_UPSERT_PK_ONLY: {
// For "upsert PK only" and "insert ignore PK only", we expect the row
// to keep its old value if the row existed, or NULL if there was no old row.
ret.val = old_row ? old_row->val : boost::none;
break;
}
default: LOG(FATAL) << "Invalid test op type: " << TestOpType_names[type];
}
if (txn_id == kNoTxnId) {
CHECK_OK(session_->Apply(op.release()));
} else {
CHECK_OK(FindOrDie(txn_sessions_, txn_id)->Apply(op.release()));
}
return ret;
}
// Adds an update of the given key/value pair to 'ops', returning the new contents
// of the row.
ExpectedKeyValueRow MutateRow(int key, uint32_t new_val, TestOpType type) {
ExpectedKeyValueRow ret;
unique_ptr<KuduWriteOperation> op;
if (type == TEST_UPDATE_IGNORE) {
op.reset(table_->NewUpdateIgnore());
} else {
op.reset(table_->NewUpdate());
}
KuduPartialRow* row = op->mutable_row();
CHECK_OK(row->SetInt32(0, key));
ret.key = key;
if (new_val % 2) {
CHECK_OK(row->SetNull(1));
} else {
CHECK_OK(row->SetInt32(1, new_val));
ret.val = new_val;
}
CHECK_OK(session_->Apply(op.release()));
return ret;
}
// Adds a delete of the given row to 'ops', returning boost::none (indicating that
// the row no longer exists).
optional<ExpectedKeyValueRow> DeleteRow(int key, TestOpType type) {
unique_ptr<KuduWriteOperation> op;
if (type == TEST_DELETE_IGNORE) {
op.reset(table_->NewDeleteIgnore());
} else {
op.reset(table_->NewDelete());
}
KuduPartialRow* row = op->mutable_row();
CHECK_OK(row->SetInt32(0, key));
CHECK_OK(session_->Apply(op.release()));
return boost::none;
}
// Random-read the given row, returning its current value.
// If the row doesn't exist, returns boost::none.
optional<ExpectedKeyValueRow> GetRow(int key) {
KuduScanner s(table_.get());
CHECK_OK(s.AddConjunctPredicate(table_->NewComparisonPredicate(
"key", KuduPredicate::EQUAL, KuduValue::FromInt(key))));
CHECK_OK(s.Open());
while (s.HasMoreRows()) {
KuduScanBatch batch;
CHECK_OK(s.NextBatch(&batch));
for (KuduScanBatch::RowPtr row : batch) {
ExpectedKeyValueRow ret;
CHECK_OK(row.GetInt32(0, &ret.key));
if (schema_.num_columns() > 1 && !row.IsNull(1)) {
ret.val = 0;
CHECK_OK(row.GetInt32(1, ret.val.get_ptr()));
}
return ret;
}
}
return boost::none;
}
// Checks that the rows in 'found' match the state we've stored 'saved_values_' corresponding
// to 'timestamp'. 'errors' collects the errors found. If 'errors' is not empty it means the
// check failed.
void CheckRowsMatchAtTimestamp(int timestamp,
vector<ExpectedKeyValueRow> rows_found,
list<string>* errors) {
int saved_timestamp = -1;
auto iter = saved_values_.upper_bound(timestamp);
if (iter == saved_values_.end()) {
if (!rows_found.empty()) {
for (auto& found_row : rows_found) {
errors->push_back(Substitute("Found unexpected row: $0", found_row.ToString()));
}
}
} else {
saved_timestamp = iter->first;
const auto& saved = (*iter).second;
int found_idx = 0;
int expected_values_counter = 0;
for (auto& expected : saved) {
if (expected) {
expected_values_counter++;
ExpectedKeyValueRow expected_val = expected.value();
if (found_idx >= rows_found.size()) {
errors->push_back(Substitute("Didn't find expected value: $0",
expected_val.ToString()));
break;
}
ExpectedKeyValueRow found_val = rows_found[found_idx++];
if (expected_val.key != found_val.key) {
errors->push_back(Substitute("Mismached key. Expected: $0 Found: $1",
expected_val.ToString(), found_val.ToString()));
continue;
}
if (expected_val.val != found_val.val) {
errors->push_back(Substitute("Mismached value. Expected: $0 Found: $1",
expected_val.ToString(), found_val.ToString()));
continue;
}
}
}
if (rows_found.size() != expected_values_counter) {
errors->push_back(Substitute("Mismatched size. Expected: $0 rows. Found: $1 rows.",
expected_values_counter, rows_found.size()));
for (auto& found_row : rows_found) {
errors->push_back(Substitute("Found unexpected row: $0", found_row.ToString()));
}
}
}
if (!errors->empty()) {
errors->push_front(Substitute("Found errors while comparing a snapshot scan at $0 with the "
"values saved at $1. Errors:",
timestamp, saved_timestamp));
}
}
// Fully consume all rows in 'scanner', writing the results to 'rows'.
//
// If 'is_deleted' is provided (only in diff scans), will also write out the
// values of the IS_DELETED virtual column.
Status ScanAllRows(KuduScanner* scanner, vector<ExpectedKeyValueRow>* rows,
vector<bool>* is_deleted) {
while (scanner->HasMoreRows()) {
KuduScanBatch batch;
RETURN_NOT_OK(scanner->NextBatch(&batch));
for (KuduScanBatch::RowPtr row : batch) {
ExpectedKeyValueRow ret;
RETURN_NOT_OK(row.GetInt32(0, &ret.key));
if (schema_.num_columns() == 2 && !row.IsNull(1)) {
ret.val = 0;
RETURN_NOT_OK(row.GetInt32(1, ret.val.get_ptr()));
}
if (is_deleted) {
bool b;
RETURN_NOT_OK(row.IsDeleted(&b));
is_deleted->push_back(b);
}
rows->emplace_back(std::move(ret));
}
}
return Status::OK();
}
// Scan the tablet at 'timestamp' and compare the result to the saved values.
void CheckScanAtTimestamp(int timestamp) {
KuduScanner s(table_.get());
ASSERT_OK(s.SetReadMode(KuduScanner::ReadMode::READ_AT_SNAPSHOT));
ASSERT_OK(s.SetSnapshotRaw(timestamp));
ASSERT_OK(s.SetFaultTolerant());
ASSERT_OK(s.Open());
vector<ExpectedKeyValueRow> found;
ASSERT_OK(ScanAllRows(&s, &found, nullptr));
list<string> errors;
CheckRowsMatchAtTimestamp(timestamp, std::move(found), &errors);
string final_error;
if (!errors.empty()) {
for (const string& error : errors) {
final_error.append("\n" + error);
}
FAIL() << final_error;
}
}
// Diff scan the tablet from 'start_timestamp' to 'end_timestamp' and compare
// the result to the saved values.
void CheckDiffScan(int start_timestamp, int end_timestamp) {
KuduScanner s(table_.get());
ASSERT_OK(s.SetDiffScan(start_timestamp, end_timestamp));
ASSERT_OK(s.Open());
vector<ExpectedKeyValueRow> found;
vector<bool> found_is_deleted;
ASSERT_OK(ScanAllRows(&s, &found, &found_is_deleted));
if (VLOG_IS_ON(1)) {
for (int i = 0; i < found.size(); i++) {
VLOG(1) << Substitute("Diff scan result: $0$1", found[i].ToString(),
found_is_deleted[i] ? " (deleted)" : "");
}
}
// Use saved_redos_ to reconstruct the expected results of the diff scan.
//
// 'selected_rows' tracks which row keys are expected in the scan results
// using the select criteria.
//
// 'expected_rows' tracks expected values of rows and is built up using the
// apply criteria. After we've processed all relevant deltas, rows not in
// 'selected_rows' will be pruned and the results compared with the diff scan.
//
// 'is_deleted_start' and 'is_deleted_end' track liveness for each row at
// the beginning and end of the time range. If a row is dead in both, it
// shouldn't be in the diff scan results.
vector<bool> selected_rows(FLAGS_keyspace_size);
vector<ExpectedKeyValueRow> expected_rows(FLAGS_keyspace_size);
vector<bool> is_deleted_start(FLAGS_keyspace_size, true);
vector<bool> is_deleted_end(FLAGS_keyspace_size, true);
for (const auto& e : saved_redos_) {
int ts = e.first;
const auto& redos = e.second;
if (redos.empty()) {
continue;
}
VLOG(1) << "Processing redos for ts @" << ts;
if (ts >= end_timestamp) {
// The redo is beyond the end of the diff scan as per both the select
// and apply criteria. We're iterating in ascending timestamp order so
// this also means all future redos are irrelevant.
if (VLOG_IS_ON(1)) {
for (const auto& redo : redos) {
VLOG(1) << "Skipping redo " << redo.ToString();
}
continue;
}
break;
}
for (const auto& redo : redos) {
VLOG(1) << "Processing redo " << redo.ToString();
if (ts >= start_timestamp) {
// The redo is relevant as per the select criteria.
VLOG(1) << "Selected row " << redo.key;
if (!selected_rows[redo.key]) {
// This is the first relevant redo for this row.
is_deleted_start[redo.key] = redo.rtype == INSERT;
selected_rows[redo.key] = true;
}
}
// The redo is relevant as per the apply criteria.
is_deleted_end[redo.key] = redo.rtype == DELETE;
if (redo.rtype != DELETE) {
// Deleted rows still exist in 'expected_rows'. This is OK;
// 'expected_is_deleted' will reflect the deletion.
expected_rows[redo.key] = { redo.key, redo.val };
}
VLOG(1) << "New value for row " << redo.key << ": "
<< expected_rows[redo.key].ToString();
VLOG(1) << "New is_deleted for row " << redo.key << ": "
<< is_deleted_end[redo.key];
}
}
vector<bool> expected_is_deleted = is_deleted_end;
// Trim the expected results as per 'selected_rows' and start/end liveness.
int row_key = 0;
expected_rows.erase(std::remove_if(
expected_rows.begin(), expected_rows.end(),
[&](const ExpectedKeyValueRow& /*row*/) {
bool retval = !selected_rows[row_key] ||
(is_deleted_start[row_key] && is_deleted_end[row_key]);
row_key++;
return retval;
}), expected_rows.end());
row_key = 0;
expected_is_deleted.erase(std::remove_if(
expected_is_deleted.begin(), expected_is_deleted.end(),
[&](bool /*is_deleted*/) {
bool retval = !selected_rows[row_key] ||
(is_deleted_start[row_key] && is_deleted_end[row_key]);
row_key++;
return retval;
}), expected_is_deleted.end());
// Test the results. Note that for deleted rows, we can't compare column
// values; the server is free to pick whatever historical values it wants.
auto fail_diff_scan = [&]() {
FAIL() << "Diff scan verification failed\n"
<< "Expected IS_DELETED: " << expected_is_deleted << "\n"
<< "Found IS_DELETED: " << found_is_deleted << "\n"
<< "Expected rows: " << expected_rows << "\n"
<< "Found rows: " << found;
};
if (expected_is_deleted != found_is_deleted) {
NO_FATALS(fail_diff_scan());
}
if (expected_rows.size() != found.size()) {
NO_FATALS(fail_diff_scan());
}
for (int i = 0; i < expected_rows.size(); i++) {
if ((expected_is_deleted[i] && expected_rows[i].key != found[i].key) ||
(!expected_is_deleted[i] && expected_rows[i] != found[i])) {
NO_FATALS(fail_diff_scan());
}
}
}
protected:
// Validate that the given sequence is valid and would not cause any
// errors assuming that there are no bugs. For example, checks to make sure there
// aren't duplicate inserts with no intervening deletions.
//
// Useful when using the 'delta' test case reduction tool to allow
// it to skip invalid test cases.
static void ValidateFuzzCase(const vector<TestOp>& test_ops);
void RunFuzzCase(const vector<TestOp>& test_ops,
int update_multiplier);
KuduSchema schema_;
unique_ptr<InternalMiniCluster> cluster_;
shared_ptr<KuduClient> client_;
shared_ptr<KuduSession> session_;
unordered_map<int, shared_ptr<KuduSession>> txn_sessions_;
shared_ptr<KuduTable> table_;
map<int,
vector<optional<ExpectedKeyValueRow>>,
std::greater<int>> saved_values_;
map<int, vector<Redo>> saved_redos_;
scoped_refptr<TabletReplica> tablet_replica_;
};
// The set of ops to draw from.
enum TestOpSets {
ALL, // Pick an operation at random from all possible operations.
PK_ONLY // Pick an operation at random from the set of operations that apply only to the
// primary key (or that are now row-specific, like flushes or compactions).
};
TestOpType PickOpAtRandom(TestOpSets sets) {
switch (sets) {
case ALL:
return kAllOps[rand() % kAllOps.size()];
case PK_ONLY:
return kPkOnlyOps[rand() % kPkOnlyOps.size()];
default:
LOG(FATAL) << "Unknown TestOpSets type: " << sets;
}
__builtin_unreachable();
}
bool IsMutation(const TestOpType& op) {
switch (op) {
case TEST_INSERT:
case TEST_INSERT_PK_ONLY:
case TEST_INSERT_IGNORE:
case TEST_INSERT_IGNORE_PK_ONLY:
case TEST_UPSERT:
case TEST_UPSERT_PK_ONLY:
case TEST_UPDATE:
case TEST_UPDATE_IGNORE:
case TEST_DELETE:
case TEST_DELETE_IGNORE:
return true;
default:
return false;
}
}
// Generate a random valid sequence of operations for use as a fuzz test, i.e.
// a set of operations that, when run, will not run into any logical errors
// (e.g. no "key already present" or "key not found" errors).
//
// To generate this sequence, we schedule one op at a time, keeping track of
// what rows exist in the tablet, what row mutations are pending, what rows are
// being mutated by in-flight ops and transactions, etc. We only select ops
// that we know to be valid, using the tracked state.
void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
// Bitset indicating whether the given row has been committed as a part of a
// transaction, or successfully inserted outside of a transaction.
vector<bool> exists(FLAGS_keyspace_size);
// The transaction ID that is operating on each row, or kNoTxnId if being
// operated on outside the context of a transaction. 'none' if the row isn't
// being operated on. If a row is not 'none', an entry must exist for it in
// 'pending_existence_per_txn'.
// TODO(awong): This is necessary so we don't have transactions trying to
// write the same rows, which currently is unprotected. Once we begin locking
// rows for the duration of the transaction, we shouldn't need this.
vector<optional<int>> txn_touching_row(FLAGS_keyspace_size);
// Represents the pending mutations to rows that are not yet visible to other
// actors, and the resulting existence status of the row if we were to flush
// a non-transactional session (keyed as kNoTxnId) or commit a transaction
// (keyed by any other value).
//
// This is used to update 'exists' when scheduling a non-transactional
// session flush or a transaction commit. It is also used to determine
// whether we can operate on pending values, e.g. insert a row and then
// delete it in the same batched op.
//
// A row key can exist in at most one IsPresentByRowKey in any form (i.e. row
// 1's existence cannot be to false in two transactions), ensuring only a
// single actor operates on a row at a time.
typedef std::map<int, bool> IsPresentByRowKey;
unordered_map<int, IsPresentByRowKey> pending_existence_per_txn;
EmplaceOrDie(&pending_existence_per_txn, kNoTxnId, IsPresentByRowKey{});
// The transactions that have client sessions that need to be flushed.
unordered_set<int> txns_needing_session_flush;
// Returns whether there are any open transactions.
const auto no_open_txns = [&pending_existence_per_txn] {
// A single entry exists for kNoTxnId.
return pending_existence_per_txn.size() == 1;
};
// Helper that deterministically (based on rand()) selects a transaction ID
// from those in flight, or, if 'maybe_none' is true, kNoTxnId to indicate a
// non-transactional operation.
const auto pick_txn_id = [&] (bool maybe_none) -> int {
if (no_open_txns() || (maybe_none && rand() % 2)) {
// Shouldn't be called when maybe_none is false if there are no pending
// transactions.
DCHECK(maybe_none);
return kNoTxnId;
}
const auto& num_txns = pending_existence_per_txn.size() - 1;
vector<int> txn_ids;
txn_ids.reserve(num_txns);
for (const auto& txn_id_and_rows : pending_existence_per_txn) {
if (txn_id_and_rows.first == kNoTxnId) continue;
txn_ids.emplace_back(txn_id_and_rows.first);
}
std::sort(txn_ids.begin(), txn_ids.end());
return txn_ids[rand() % num_txns];
};
int next_txn_id = 0;
int op_timestamps = 0;
bool data_in_mrs = false;
bool worth_compacting = false;
bool data_in_dms = false;
ops->clear();
while (ops->size() < len) {
TestOpType r = PickOpAtRandom(sets);
int row_key = rand() % FLAGS_keyspace_size;
// When we perform a test mutation, we also call GetRow() which does a scan
// and thus increases the server's timestamp.
if (IsMutation(r)) {
op_timestamps++;
}
switch (r) {
case TEST_INSERT:
case TEST_INSERT_PK_ONLY: {
const auto& txn_id = pick_txn_id(/*maybe_none*/true);
const auto& txn_operating_on_row = txn_touching_row[row_key];
if (txn_operating_on_row && *txn_operating_on_row != txn_id) {
// The row is being operated on by another txn.
continue;
}
if (txn_operating_on_row && *txn_operating_on_row == txn_id &&
FindOrDie(FindOrDie(pending_existence_per_txn, txn_id), row_key)) {
// The row is being operated on by this txn and the row pending
// state exists.
continue;
}
if (!txn_operating_on_row && exists[row_key]) {
// The row is not being operated on by another txn, but the row
// exists.
continue;
}
ops->emplace_back(r, row_key, txn_id);
txn_touching_row[row_key] = txn_id;
FindOrDie(pending_existence_per_txn, txn_id)[row_key] = true;
EmplaceIfNotPresent(&txns_needing_session_flush, txn_id);
if (txn_id == kNoTxnId) {
data_in_mrs = true;
}
break;
}
case TEST_INSERT_IGNORE:
case TEST_INSERT_IGNORE_PK_ONLY: {
const auto& txn_id = pick_txn_id(/*maybe_none*/true);
const auto& txn_operating_on_row = txn_touching_row[row_key];
if (txn_operating_on_row && *txn_operating_on_row != txn_id) {
// The row is being operated on by another txn.
continue;
}
ops->emplace_back(r, row_key, txn_id);
txn_touching_row[row_key] = txn_id;
FindOrDie(pending_existence_per_txn, txn_id)[row_key] = true;
EmplaceIfNotPresent(&txns_needing_session_flush, txn_id);
if (txn_id == kNoTxnId && !exists[row_key]) {
data_in_mrs = true;
}
break;
}
case TEST_UPSERT:
case TEST_UPSERT_PK_ONLY: {
const auto& txn_operating_on_row = txn_touching_row[row_key];
if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
// The row is being operated on by a txn.
continue;
}
ops->emplace_back(r, row_key);
txn_touching_row[row_key] = kNoTxnId;
auto& row_exists = FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key];
if (!row_exists) {
data_in_mrs = true;
} else {
data_in_dms = true;
}
row_exists = true;
EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
break;
}
case TEST_UPDATE_IGNORE: {
const auto& txn_operating_on_row = txn_touching_row[row_key];
if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
// The row is being operated on by another txn.
continue;
}
if (!txn_operating_on_row) {
if (exists[row_key] && !data_in_mrs) {
// The row is not being operated on by another txn, and it exists and
// has been flushed, meaning this op will result in a DMS mutation.
data_in_dms = true;
}
// The existence status shouldn't change.
FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = exists[row_key];
}
ops->emplace_back(r, row_key);
txn_touching_row[row_key] = kNoTxnId;
EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
break;
}
case TEST_UPDATE: {
const auto& txn_operating_on_row = txn_touching_row[row_key];
if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
// The row is being operated on by another txn.
continue;
}
if (txn_operating_on_row && *txn_operating_on_row == kNoTxnId &&
!FindOrDie(FindOrDie(pending_existence_per_txn, kNoTxnId), row_key)) {
// The row is being operated on by an in-flight op, but the pending
// row state doesn't exist.
continue;
}
if (!txn_operating_on_row) {
if (!exists[row_key]) {
// The row is not being operated on by another txn, but the row
// doesn't exist so we can't update anything.
continue;
}
if (!data_in_mrs) {
// The row is not being operated on by another txn, and it exists
// in a DRS, meaning this op will result in a DMS mutation.
data_in_dms = true;
}
// The existence status shouldn't change.
FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = exists[row_key];
}
ops->emplace_back(r, row_key);
txn_touching_row[row_key] = kNoTxnId;
EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
break;
}
case TEST_DELETE_IGNORE: {
const auto& txn_operating_on_row = txn_touching_row[row_key];
if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
// The row is being operated on by another txn.
continue;
}
if (!txn_operating_on_row && exists[row_key] && !data_in_mrs) {
// The row is not being operated on by another txn, and it exists in
// a DRS, meaning this op will result in a DMS mutation.
data_in_dms = true;
}
ops->emplace_back(r, row_key);
FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = false;
txn_touching_row[row_key] = kNoTxnId;
EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
break;
}
case TEST_DELETE: {
const auto& txn_operating_on_row = txn_touching_row[row_key];
if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
// The row is being operated on by a txn. Since we don't support
// mutating a row while it is participating in a transaction, we must
// wait for transaction to complete before doing anything.
continue;
}
if (txn_operating_on_row && *txn_operating_on_row == kNoTxnId &&
!FindOrDie(FindOrDie(pending_existence_per_txn, kNoTxnId), row_key)) {
// The row is being operated on by a non-transactional in-flight op,
// meaning we can only correctly delete the row if the in-flight op
// were to insert the row, making the row's existence pending.
// Otherwise, we cannot schedule a delete.
continue;
}
if (!txn_operating_on_row && !exists[row_key]) {
// The row is not being operated on by another txn, but the row
// doesn't exist, so we cannot schedule a delete.
continue;
}
ops->emplace_back(TEST_DELETE, row_key);
FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = false;
txn_touching_row[row_key] = kNoTxnId;
EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
if (!data_in_mrs) {
// The row exists in a DRS, so this op will result in a DMS mutation.
data_in_dms = true;
}
break;
}
case TEST_FLUSH_OPS: {
const auto& txn_id = pick_txn_id(/*maybe_none*/true);
// If the picked transaction doesn't have any ops buffered in its
// session yet, pick a new action.
if (!ContainsKey(txns_needing_session_flush, txn_id)) continue;
if (txn_id == kNoTxnId) {
// If flushing rows that aren't part of any transaction, apply
// their state immediately.
auto& pending_existence_per_row = FindOrDie(pending_existence_per_txn, kNoTxnId);
for (const auto& key_and_exists : pending_existence_per_row) {
const auto& row_key = key_and_exists.first;
exists[row_key] = key_and_exists.second;
DCHECK_EQ(kNoTxnId, txn_touching_row[row_key]);
txn_touching_row[row_key] = boost::none;
}
pending_existence_per_row.clear();
}
op_timestamps++;
txns_needing_session_flush.erase(txn_id);
ops->emplace_back(TEST_FLUSH_OPS, txn_id);
break;
}
case TEST_FLUSH_TABLET:
if (data_in_mrs) {
// Non-transactions eagerly set 'data_in_mrs', expecting a session op
// to be scheduled alongside the tablet flush. Transactions don't do
// this -- they only set 'data_in_mrs' once committed.
if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
ops->emplace_back(TEST_FLUSH_OPS);
txns_needing_session_flush.erase(kNoTxnId);
}
ops->emplace_back(TEST_FLUSH_TABLET);
data_in_mrs = false;
worth_compacting = true;
}
break;
case TEST_COMPACT_TABLET:
if (worth_compacting) {
if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
ops->emplace_back(TEST_FLUSH_OPS);
txns_needing_session_flush.erase(kNoTxnId);
}
ops->emplace_back(TEST_COMPACT_TABLET);
worth_compacting = false;
}
break;
case TEST_FLUSH_DELTAS:
if (data_in_dms) {
if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
ops->emplace_back(TEST_FLUSH_OPS);
txns_needing_session_flush.erase(kNoTxnId);
}
ops->emplace_back(TEST_FLUSH_DELTAS);
data_in_dms = false;
}
break;
case TEST_MAJOR_COMPACT_DELTAS:
ops->emplace_back(TEST_MAJOR_COMPACT_DELTAS);
break;
case TEST_MINOR_COMPACT_DELTAS:
ops->emplace_back(TEST_MINOR_COMPACT_DELTAS);
break;
case TEST_RESTART_TS:
ops->emplace_back(TEST_RESTART_TS);
break;
case TEST_SCAN_AT_TIMESTAMP: {
int timestamp = 1;
if (op_timestamps > 0) {
timestamp = (rand() % op_timestamps) + 1;
}
ops->emplace_back(TEST_SCAN_AT_TIMESTAMP, timestamp);
break;
}
case TEST_DIFF_SCAN: {
int start_timestamp = 1;
int end_timestamp = 1;
if (op_timestamps > 0) {
start_timestamp = (rand() % op_timestamps) + 1;
end_timestamp = (rand() % op_timestamps) + 1;
if (start_timestamp > end_timestamp) {
std::swap(start_timestamp, end_timestamp);
}
}
ops->emplace_back(TEST_DIFF_SCAN, start_timestamp, end_timestamp);
break;
}
case TEST_BEGIN_TXN: {
// If we have --max_open_txns open transactions, we can't begin a new
// transaction. NOTE: 'pending_existence_per_txn' also includes
// kNoTxnId, hence the extra count.
if (pending_existence_per_txn.size() == 1 + FLAGS_max_open_txns) continue;
const auto txn_id = next_txn_id++;
ops->emplace_back(r, txn_id);
EmplaceOrDie(&pending_existence_per_txn, txn_id, IsPresentByRowKey{});
op_timestamps++;
break;
}
case TEST_COMMIT_TXN: {
if (no_open_txns()) continue;
const auto txn_id = pick_txn_id(/*maybe_none*/false);
DCHECK_NE(kNoTxnId, txn_id);
// If there are ops pending for this transaction, we need to flush them too.
if (ContainsKey(txns_needing_session_flush, txn_id)) {
op_timestamps++;
txns_needing_session_flush.erase(txn_id);
ops->emplace_back(TEST_FLUSH_OPS, txn_id);
}
ops->emplace_back(r, txn_id);
auto pending_existence = EraseKeyReturnValuePtr(&pending_existence_per_txn, txn_id);
for (const auto& key_and_exists : pending_existence) {
const auto& key = key_and_exists.first;
const auto& key_exists = key_and_exists.second;
DCHECK(key_exists); // only support inserts
// Since we're commiting the transaction, its MRS should hold state
// if there are any inserted rows.
data_in_mrs = true;
exists[key] = true;
txn_touching_row[key] = boost::none;
}
// Commit replicates two ops (BEGIN_COMMIT and FINALIZE_COMMIT).
op_timestamps += 2;
break;
}
case TEST_ABORT_TXN: {
if (no_open_txns()) continue;
const auto txn_id = pick_txn_id(/*maybe_none*/false);
DCHECK_NE(kNoTxnId, txn_id);
ops->emplace_back(r, txn_id);
auto pending_existence = EraseKeyReturnValuePtr(&pending_existence_per_txn, txn_id);
for (const auto& key_and_exists : pending_existence) {
txn_touching_row[key_and_exists.first] = boost::none;
}
op_timestamps++;
break;
}
default:
LOG(FATAL) << "Invalid op type: " << r;
}
}
}
string DumpTestCase(const vector<TestOp>& ops) {
vector<string> strs;
for (TestOp test_op : ops) {
strs.push_back(test_op.ToString());
}
return JoinStrings(strs, ",\n");
}
void FuzzTest::ValidateFuzzCase(const vector<TestOp>& test_ops) {
vector<bool> exists(FLAGS_keyspace_size);
unordered_map<int, vector<std::pair<int, TestOpType>>> pending_rows_per_txn;
for (const auto& test_op : test_ops) {
switch (test_op.type) {
case TEST_INSERT:
case TEST_INSERT_PK_ONLY:
CHECK(!exists[test_op.val]) << "invalid case: inserting already-existing row";
FALLTHROUGH_INTENDED;
case TEST_INSERT_IGNORE:
case TEST_INSERT_IGNORE_PK_ONLY: {
const auto& txn_id = test_op.val2;
if (txn_id == kNoTxnId) {
exists[test_op.val] = true;
} else {
auto& rows = FindOrDie(pending_rows_per_txn, txn_id);
rows.emplace_back(make_pair(test_op.val, test_op.type));
}
break;
}
// TODO(awong): UPSERT, UPDATE, and DELETE ops should account for
// 'pending_rows_per_txn' once we begin supporting transactions.
case TEST_UPSERT:
case TEST_UPSERT_PK_ONLY:
exists[test_op.val] = true;
break;
case TEST_UPDATE:
CHECK(exists[test_op.val]) << "invalid case: updating non-existing row";
break;
case TEST_UPDATE_IGNORE:
// No change to `exists[test_op.val]`.
break;
case TEST_DELETE:
CHECK(exists[test_op.val]) << "invalid case: deleting non-existing row";
exists[test_op.val] = false;
break;
case TEST_DELETE_IGNORE:
exists[test_op.val] = false;
break;
case TEST_BEGIN_TXN:
EmplaceOrDie(&pending_rows_per_txn, test_op.val, vector<pair<int, TestOpType>>());
break;
case TEST_COMMIT_TXN: {
auto rows_and_ops = EraseKeyReturnValuePtr(&pending_rows_per_txn, test_op.val);
for (const auto& row_and_op : rows_and_ops) {
const auto& row = row_and_op.first;
const auto& op = row_and_op.second;
switch (op) {
case TEST_INSERT:
case TEST_INSERT_PK_ONLY:
CHECK(!exists[row]);
FALLTHROUGH_INTENDED;
case TEST_INSERT_IGNORE:
case TEST_INSERT_IGNORE_PK_ONLY:
exists[row] = true;
break;
default:
LOG(DFATAL) << "transactions only support insert operations";
}
}
break;
}
case TEST_ABORT_TXN: {
// Ensure that the rows this transaction was operating on were valid.
auto rows_and_ops = EraseKeyReturnValuePtr(&pending_rows_per_txn, test_op.val);
for (const auto& row_and_op : rows_and_ops) {
const auto& row = row_and_op.first;
const auto& op = row_and_op.second;
switch (op) {
case TEST_INSERT:
case TEST_INSERT_PK_ONLY:
CHECK(!exists[row]);
break;
case TEST_INSERT_IGNORE:
case TEST_INSERT_IGNORE_PK_ONLY:
break;
default:
LOG(DFATAL) << "transactions only support insert operations";
}
}
break;
}
default:
break;
}
}
}
void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
int update_multiplier = 1) {
ValidateFuzzCase(test_ops);
// Dump the test case, since we usually run a random one.
// This dump format is easy for a developer to copy-paste back
// into a test method in order to reproduce a failure.
LOG(INFO) << "test case:\n" << DumpTestCase(test_ops);
// Represent the expected state of the table if we were to flush ops.
vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size);
// The ops that are current pending in a session (not flushed yet).
typedef unordered_map<int, optional<ExpectedKeyValueRow>> ValueByRowKey;
unordered_map<int, ValueByRowKey> pending_vals_per_txn;
EmplaceOrDie(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
// We keep track of the redos too so it's easier to piece together the
// expected results of a diff scan.
unordered_map<int, vector<Redo>> pending_redos_per_txn;
EmplaceOrDie(&pending_redos_per_txn, kNoTxnId, vector<Redo>{});
// Returns the latest value for the given 'row_key' that is pending for the
// given transaction. If no mutations are pending for the 'row_key' in the
// given transaction, returns the latest committed value.
const auto pending_row_by_key_for_txn = [&] (int row_key, int txn_id) {
auto* pending_row_by_key = FindOrNull(pending_vals_per_txn, txn_id);
if (pending_row_by_key && ContainsKey(*pending_row_by_key, row_key)) {
return (*pending_row_by_key)[row_key];
}
return cur_val[row_key];
};
int i = 0;
for (const TestOp& test_op : test_ops) {
LOG(INFO) << "Running op " << test_op.ToString();
if (IsMutation(test_op.type)) {
EXPECT_EQ(cur_val[test_op.val], GetRow(test_op.val));
}
switch (test_op.type) {
case TEST_INSERT:
case TEST_INSERT_PK_ONLY:
case TEST_INSERT_IGNORE:
case TEST_INSERT_IGNORE_PK_ONLY:
case TEST_UPSERT:
case TEST_UPSERT_PK_ONLY: {
const auto& row_key = test_op.val;
const auto& txn_id = test_op.val2;
const auto& old_row = pending_row_by_key_for_txn(row_key, txn_id);
RedoType rtype = old_row ? UPDATE : INSERT;
auto pending_row = InsertOrUpsertRow(
row_key, i++, old_row, test_op.type, txn_id);
auto& pending_row_by_key = LookupOrEmplace(&pending_vals_per_txn, txn_id, ValueByRowKey{});
EmplaceOrUpdate(&pending_row_by_key, row_key, pending_row);
// An insert ignore on a row that already exists will be dropped server-side.
// An "upsert PK-only" that is converted into an update will be dropped server-side.
// We must do the same.
if ((test_op.type == TEST_INSERT_IGNORE ||
test_op.type == TEST_INSERT_IGNORE_PK_ONLY ||
test_op.type == TEST_UPSERT_PK_ONLY) &&
rtype == UPDATE) {
break;
}
// There will actually be an effect on the server-side state so keep
// track of the change.
FindOrDie(pending_redos_per_txn, txn_id).emplace_back(rtype, row_key, pending_row.val);
break;
}
case TEST_UPDATE:
case TEST_UPDATE_IGNORE: {
const auto& row_key = test_op.val;
if (test_op.type == TEST_UPDATE_IGNORE && !pending_row_by_key_for_txn(row_key, kNoTxnId)) {
// Still call MutateRow to apply the UPDATE_IGNORE operations to the session.
// However don't adjust the pending values given the operation will be ignored.
for (int j = 0; j < update_multiplier; j++) {
MutateRow(row_key, i++, test_op.type);
}
break;
}
ExpectedKeyValueRow latest_update;
for (int j = 0; j < update_multiplier; j++) {
latest_update = MutateRow(row_key, i++, test_op.type);
}
FindOrDie(pending_redos_per_txn, kNoTxnId).emplace_back(UPDATE, row_key, latest_update.val);
auto& pending_row_by_key =
LookupOrEmplace(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
EmplaceOrUpdate(&pending_row_by_key, row_key, latest_update);
break;
}
case TEST_DELETE:
case TEST_DELETE_IGNORE: {
const auto& row_key = test_op.val;
DeleteRow(test_op.val, test_op.type);
if (test_op.type == TEST_DELETE_IGNORE && !pending_row_by_key_for_txn(row_key, kNoTxnId)) {
// Still call DeleteRow to apply the DELETE_IGNORE operation to the session.
// However don't adjust the pending values given the operation will be ignored.
break;
}
FindOrDie(pending_redos_per_txn, kNoTxnId).emplace_back(DELETE, row_key, boost::none);
auto& pending_row_by_key =
LookupOrEmplace(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
EmplaceOrUpdate(&pending_row_by_key, row_key, boost::none);
break;
}
case TEST_FLUSH_OPS: {
const auto& txn_id = test_op.val;
auto session = txn_id == kNoTxnId ? session_ : FindOrDie(txn_sessions_, txn_id);
FlushSessionOrDie(session);
// Only update the saved and pending values if the flush is _not_ part
// of a transaction. Transactional mutations should only take effect
// once committed.
if (txn_id == kNoTxnId) {
int current_time = down_cast<kudu::clock::LogicalClock*>(
tablet()->clock())->GetCurrentTime();
VLOG(1) << "Current time: " << current_time;
auto& pending_vals_no_txn = FindOrDie(pending_vals_per_txn, kNoTxnId);
for (const auto& kv : pending_vals_no_txn) {
cur_val[kv.first] = kv.second;
}
pending_vals_no_txn.clear();
saved_values_[current_time] = cur_val;
auto& pending_redos_no_txn = FindOrDie(pending_redos_per_txn, kNoTxnId);
saved_redos_[current_time] = pending_redos_no_txn;
pending_redos_no_txn.clear();
}
break;
}
case TEST_FLUSH_TABLET:
ASSERT_OK(tablet()->Flush());
break;
case TEST_FLUSH_DELTAS:
ASSERT_OK(tablet()->FlushBiggestDMS());
break;
case TEST_MAJOR_COMPACT_DELTAS:
ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
break;
case TEST_MINOR_COMPACT_DELTAS:
ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION));
break;
case TEST_COMPACT_TABLET:
ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
break;
case TEST_RESTART_TS:
NO_FATALS(RestartTabletServer());
break;
case TEST_SCAN_AT_TIMESTAMP:
NO_FATALS(CheckScanAtTimestamp(test_op.val));
break;
case TEST_DIFF_SCAN:
NO_FATALS(CheckDiffScan(test_op.val, test_op.val2));
break;
case TEST_BEGIN_TXN: {
const auto& txn_id = test_op.val;
shared_ptr<KuduSession> s(new KuduSession(client_, TxnId(txn_id)));
s->data_->Init(s);
ASSERT_OK(s->SetFlushMode(KuduSession::MANUAL_FLUSH));
s->SetTimeoutMillis(60 * 1000);
EmplaceOrDie(&txn_sessions_, txn_id, std::move(s));
EmplaceOrDie(&pending_vals_per_txn, txn_id, ValueByRowKey{});
EmplaceOrDie(&pending_redos_per_txn, txn_id, vector<Redo>{});
ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_TXN, -1));
break;
}
case TEST_COMMIT_TXN: {
const auto& txn_id = test_op.val;
// Before committing, flush all the rows we have pending for this transaction.
FlushSessionOrDie(FindOrDie(txn_sessions_, txn_id));
ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_COMMIT, -1));
int current_time = down_cast<kudu::clock::LogicalClock*>(
tablet()->clock())->GetCurrentTime();
ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::FINALIZE_COMMIT,
current_time));
VLOG(1) << "Current time: " << current_time;
auto txn_pending_vals = EraseKeyReturnValuePtr(&pending_vals_per_txn, txn_id);
for (const auto& kv : txn_pending_vals) {
cur_val[kv.first] = kv.second;
}
saved_values_[current_time] = cur_val;
auto txn_pending_redos = EraseKeyReturnValuePtr(&pending_redos_per_txn, txn_id);
saved_redos_[current_time] = txn_pending_redos;
txn_sessions_.erase(txn_id);
break;
}
case TEST_ABORT_TXN: {
const auto& txn_id = test_op.val;
ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::ABORT_TXN, -1));
txn_sessions_.erase(txn_id);
break;
}
default:
LOG(FATAL) << test_op.type;
}
}
}
// Generates a random test sequence and runs it.
// The logs of this test are designed to easily be copy-pasted and create
// more specific test cases like TestFuzz<N> below.
TEST_F(FuzzTest, TestRandomFuzzPksOnly) {
CreateTabletAndStartClusterWithSchema(Schema({ColumnSchema("key", INT32)}, 1));
SeedRandom();
vector<TestOp> test_ops;
GenerateTestCase(&test_ops, AllowSlowTests() ? 1000 : 50, TestOpSets::PK_ONLY);
RunFuzzCase(test_ops);
}
// Generates a random test sequence and runs it.
// The logs of this test are designed to easily be copy-pasted and create
// more specific test cases like TestFuzz<N> below.
TEST_F(FuzzTest, TestRandomFuzz) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
SeedRandom();
vector<TestOp> test_ops;
GenerateTestCase(&test_ops, AllowSlowTests() ? 1000 : 50);
RunFuzzCase(test_ops);
}
// Generates a random test case, but the UPDATEs are all repeated many times.
// This results in very large batches which are likely to span multiple delta blocks
// when flushed.
TEST_F(FuzzTest, TestRandomFuzzHugeBatches) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
SeedRandom();
vector<TestOp> test_ops;
GenerateTestCase(&test_ops, AllowSlowTests() ? 500 : 50);
int update_multiplier;
#ifdef THREAD_SANITIZER
// TSAN builds run more slowly, so 500 can cause timeouts.
update_multiplier = 100;
#else
update_multiplier = 500;
#endif
RunFuzzCase(test_ops, update_multiplier);
}
TEST_F(FuzzTest, TestFuzz1) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
vector<TestOp> test_ops = {
// Get an inserted row in a DRS.
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
// DELETE in DMS, INSERT in MRS and flush again.
{TEST_DELETE, 0},
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
// State:
// RowSet RowSet(0):
// (int32 key=1, int32 val=NULL) Undos: [@1(DELETE)] Redos (in DMS): [@2 DELETE]
// RowSet RowSet(1):
// (int32 key=1, int32 val=NULL) Undos: [@2(DELETE)] Redos: []
{TEST_COMPACT_TABLET},
};
RunFuzzCase(test_ops);
}
// A particular test case which previously failed TestFuzz.
TEST_F(FuzzTest, TestFuzz2) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
vector<TestOp> test_ops = {
{TEST_INSERT, 0},
{TEST_DELETE, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
// (int32 key=1, int32 val=NULL)
// Undo Mutations: [@1(DELETE)]
// Redo Mutations: [@1(DELETE)]
{TEST_INSERT, 0},
{TEST_DELETE, 0},
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
// (int32 key=1, int32 val=NULL)
// Undo Mutations: [@2(DELETE)]
// Redo Mutations: []
{TEST_COMPACT_TABLET},
// Output Row: (int32 key=1, int32 val=NULL)
// Undo Mutations: [@1(DELETE)]
// Redo Mutations: [@1(DELETE)]
{TEST_DELETE, 0},
{TEST_FLUSH_OPS},
{TEST_COMPACT_TABLET},
};
RunFuzzCase(test_ops);
}
// A particular test case which previously failed TestFuzz.
TEST_F(FuzzTest, TestFuzz3) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
vector<TestOp> test_ops = {
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
// Output Row: (int32 key=1, int32 val=NULL)
// Undo Mutations: [@1(DELETE)]
// Redo Mutations: []
{TEST_DELETE, 0},
// Adds a @2 DELETE to DMS for above row.
{TEST_INSERT, 0},
{TEST_DELETE, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
// (int32 key=1, int32 val=NULL)
// Undo Mutations: [@2(DELETE)]
// Redo Mutations: [@2(DELETE)]
// Compaction input:
// Row 1: (int32 key=1, int32 val=NULL)
// Undo Mutations: [@2(DELETE)]
// Redo Mutations: [@2(DELETE)]
// Row 2: (int32 key=1, int32 val=NULL)
// Undo Mutations: [@1(DELETE)]
// Redo Mutations: [@2(DELETE)]
{TEST_COMPACT_TABLET},
};
RunFuzzCase(test_ops);
}
// A particular test case which previously failed TestFuzz.
TEST_F(FuzzTest, TestFuzz4) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
vector<TestOp> test_ops = {
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_COMPACT_TABLET},
{TEST_DELETE, 0},
{TEST_FLUSH_OPS},
{TEST_COMPACT_TABLET},
{TEST_INSERT, 0},
{TEST_UPDATE, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_DELETE, 0},
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_UPDATE, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_UPDATE, 0},
{TEST_DELETE, 0},
{TEST_INSERT, 0},
{TEST_DELETE, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_COMPACT_TABLET},
};
RunFuzzCase(test_ops);
}
TEST_F(FuzzTest, TestFuzz5) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
vector<TestOp> test_ops = {
{TEST_UPSERT_PK_ONLY, 1},
{TEST_FLUSH_OPS},
{TEST_INSERT, 0},
{TEST_SCAN_AT_TIMESTAMP, 5},
};
RunFuzzCase(test_ops);
}
// Previously caused incorrect data being read after restart.
// Failure:
// Value of: val_in_table
// Actual: "()"
// Expected: "(" + cur_val + ")"
TEST_F(FuzzTest, TestFuzzWithRestarts1) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_INSERT, 1},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_UPDATE, 1},
{TEST_RESTART_TS},
{TEST_FLUSH_OPS},
{TEST_FLUSH_DELTAS},
{TEST_INSERT, 0},
{TEST_DELETE, 1},
{TEST_INSERT, 1},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_RESTART_TS},
{TEST_MINOR_COMPACT_DELTAS},
{TEST_COMPACT_TABLET},
{TEST_UPDATE, 1},
{TEST_FLUSH_OPS}
});
}
// Previously caused KUDU-1341:
// deltafile.cc:134] Check failed: last_key_.CompareTo<UNDO>(key) <= 0 must
// insert undo deltas in sorted order (ascending key, then descending ts):
// got key (row 1@ts5965182714017464320) after (row 1@ts5965182713875046400)
TEST_F(FuzzTest, TestFuzzWithRestarts2) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_DELETE, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_DELTAS},
{TEST_RESTART_TS},
{TEST_INSERT, 1},
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_DELETE, 0},
{TEST_INSERT, 0},
{TEST_UPDATE, 1},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_FLUSH_DELTAS},
{TEST_RESTART_TS},
{TEST_UPDATE, 1},
{TEST_DELETE, 1},
{TEST_FLUSH_OPS},
{TEST_RESTART_TS},
{TEST_INSERT, 1},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_RESTART_TS},
{TEST_COMPACT_TABLET}
});
}
// Regression test for KUDU-1467: a sequence involving
// UPSERT which failed to replay properly upon bootstrap.
TEST_F(FuzzTest, TestUpsertSeq) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_INSERT, 1},
{TEST_UPSERT, 1},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_UPSERT, 1},
{TEST_DELETE, 1},
{TEST_UPSERT, 1},
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_RESTART_TS},
{TEST_UPDATE, 1},
});
}
// Regression test for KUDU-1623: updates without primary key
// columns specified can cause crashes and issues at restart.
TEST_F(FuzzTest, TestUpsert_PKOnlyOps) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_INSERT, 1},
{TEST_FLUSH_OPS},
{TEST_UPSERT_PK_ONLY, 1},
{TEST_FLUSH_OPS},
{TEST_RESTART_TS}
});
}
// Regression test for KUDU-1905: reinserts to a tablet that has
// only primary keys end up as empty change lists. We were previously
// crashing when a changelist was empty.
TEST_F(FuzzTest, TestUpsert_PKOnlySchema) {
CreateTabletAndStartClusterWithSchema(Schema({ColumnSchema("key", INT32)}, 1));
RunFuzzCase({
{TEST_UPSERT_PK_ONLY, 1},
{TEST_DELETE, 1},
{TEST_UPSERT_PK_ONLY, 1},
{TEST_UPSERT_PK_ONLY, 1},
{TEST_FLUSH_OPS}
});
}
// MRS test for KUDU-2809: a row that has been inserted and deleted within the
// time range of a diff scan is excluded from the results.
TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanMRS) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_DELETE, 0},
{TEST_FLUSH_OPS},
{TEST_DIFF_SCAN, 4, 7}
});
}
// DRS test for KUDU-2809: a row that has been inserted and deleted within the
// time range of a diff scan is excluded from the results.
TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanDRS) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_DELETE, 0},
{TEST_FLUSH_OPS},
{TEST_DIFF_SCAN, 4, 7}
});
}
TEST_F(FuzzTest, TestReplayDeletesOnTxnRowsets) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_INSERT_PK_ONLY, 1, -1},
{TEST_FLUSH_OPS, -1},
{TEST_FLUSH_TABLET},
{TEST_BEGIN_TXN, 2},
{TEST_INSERT_IGNORE_PK_ONLY, 0, 2},
{TEST_FLUSH_OPS, 2},
{TEST_COMMIT_TXN, 2},
{TEST_DELETE, 0},
{TEST_DELETE, 1},
{TEST_FLUSH_OPS, -1},
{TEST_FLUSH_DELTAS},
{TEST_RESTART_TS},
});
}
TEST_F(FuzzTest, TestFlushMRSsWithInvisibleRows) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_BEGIN_TXN, 0},
{TEST_INSERT_IGNORE, 1, 0},
{TEST_FLUSH_OPS, 0},
{TEST_COMMIT_TXN, 0},
{TEST_INSERT_PK_ONLY, 0, -1},
{TEST_INSERT_IGNORE_PK_ONLY, 0, -1},
{TEST_DELETE, 0},
{TEST_FLUSH_OPS, -1},
{TEST_RESTART_TS},
{TEST_MAJOR_COMPACT_DELTAS},
{TEST_DELETE, 1},
{TEST_FLUSH_OPS, -1},
{TEST_FLUSH_TABLET},
});
}
// Regression test for KUDU-3108. Previously caused us to have divergent 'hot'
// and 'hotmaxes' containers in the merge iterator, causing us to read invalid
// state and crash.
TEST_F(FuzzTest, Kudu3108) {
CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
RunFuzzCase({
{TEST_INSERT, 1},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_INSERT, 3},
{TEST_DELETE, 1},
{TEST_FLUSH_OPS},
{TEST_FLUSH_TABLET},
{TEST_INSERT, 1},
{TEST_INSERT, 0},
{TEST_FLUSH_OPS},
{TEST_DIFF_SCAN, 5, 12},
});
}
} // namespace tablet
} // namespace kudu
// Has to be defined outside of any namespace.
MAKE_ENUM_LIMITS(kudu::tablet::TestOpType,
kudu::tablet::TEST_INSERT,
kudu::tablet::TEST_NUM_OP_TYPES);