| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <list> |
| #include <map> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/optional/optional.hpp> // IWYU pragma: keep |
| #include <boost/optional/optional_io.hpp> // IWYU pragma: keep |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <glog/stl_logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client-test-util.h" |
| #include "kudu/client/client.h" |
| #include "kudu/client/scan_batch.h" |
| #include "kudu/client/scan_predicate.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/session-internal.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/value.h" |
| #include "kudu/client/write_op.h" |
| #include "kudu/clock/clock.h" |
| #include "kudu/clock/logical_clock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/txn_id.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/tablet/key_value_test_schema.h" |
| #include "kudu/tablet/rowset.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tablet/txn_participant-test-util.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DEFINE_int32(keyspace_size, 5, "number of distinct primary keys to test with"); |
| DEFINE_int32(max_open_txns, 5, "maximum number of open transactions to test with"); |
| DECLARE_bool(enable_maintenance_manager); |
| DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps); |
| DECLARE_bool(tserver_txn_write_op_handling_enabled); |
| DECLARE_bool(use_hybrid_clock); |
| |
| using boost::optional; |
| using kudu::client::KuduClient; |
| using kudu::client::KuduClientBuilder; |
| using kudu::client::KuduDelete; |
| using kudu::client::KuduPredicate; |
| using kudu::client::KuduScanBatch; |
| using kudu::client::KuduScanner; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableCreator; |
| using kudu::client::KuduUpdate; |
| using kudu::client::KuduValue; |
| using kudu::client::KuduWriteOperation; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using kudu::tserver::ParticipantOpPB; |
| using kudu::tserver::ParticipantResponsePB; |
| using std::list; |
| using std::map; |
| using std::make_pair; |
| using std::pair; |
| using std::string; |
| using std::vector; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tablet { |
| |
| // The type of operation in a sequence of operations generated by |
| // the fuzz test. |
| enum TestOpType { |
| TEST_INSERT, |
| TEST_INSERT_PK_ONLY, |
| TEST_INSERT_IGNORE, |
| TEST_INSERT_IGNORE_PK_ONLY, |
| TEST_UPSERT, |
| TEST_UPSERT_PK_ONLY, |
| TEST_UPDATE, |
| TEST_UPDATE_IGNORE, |
| TEST_DELETE, |
| TEST_DELETE_IGNORE, |
| TEST_FLUSH_OPS, |
| TEST_FLUSH_TABLET, |
| TEST_FLUSH_DELTAS, |
| TEST_MINOR_COMPACT_DELTAS, |
| TEST_MAJOR_COMPACT_DELTAS, |
| TEST_COMPACT_TABLET, |
| TEST_RESTART_TS, |
| TEST_SCAN_AT_TIMESTAMP, |
| TEST_DIFF_SCAN, |
| TEST_BEGIN_TXN, |
| TEST_COMMIT_TXN, |
| TEST_ABORT_TXN, |
| TEST_NUM_OP_TYPES // max value for enum |
| }; |
| |
| const char* kTableName = "table"; |
| const char* TestOpType_names[] = { |
| "TEST_INSERT", |
| "TEST_INSERT_PK_ONLY", |
| "TEST_INSERT_IGNORE", |
| "TEST_INSERT_IGNORE_PK_ONLY", |
| "TEST_UPSERT", |
| "TEST_UPSERT_PK_ONLY", |
| "TEST_UPDATE", |
| "TEST_UPDATE_IGNORE", |
| "TEST_DELETE", |
| "TEST_DELETE_IGNORE", |
| "TEST_FLUSH_OPS", |
| "TEST_FLUSH_TABLET", |
| "TEST_FLUSH_DELTAS", |
| "TEST_MINOR_COMPACT_DELTAS", |
| "TEST_MAJOR_COMPACT_DELTAS", |
| "TEST_COMPACT_TABLET", |
| "TEST_RESTART_TS", |
| "TEST_SCAN_AT_TIMESTAMP", |
| "TEST_DIFF_SCAN", |
| "TEST_BEGIN_TXN", |
| "TEST_COMMIT_TXN", |
| "TEST_ABORT_TXN", |
| }; |
| constexpr const int kNoTxnId = -1; |
| // Identical to kNoTxnId but more generic-sounding for ops that don't use |
| // transaction IDs. |
| constexpr const int kNoVal = -1; |
| |
| // An operation in a fuzz-test sequence. |
| struct TestOp { |
| // NOLINTNEXTLINE(google-explicit-constructor) |
| TestOp(TestOpType t, int v1 = kNoVal, int v2 = kNoVal) // NOLINT(runtime/explicit) |
| : type(t), |
| val(v1), |
| val2(v2) {} |
| |
| // The op to run. |
| TestOpType type; |
| |
| // For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified. |
| // For SCAN_AT_TIMESTAMP the timestamp of the scan. |
| // For DIFF_SCAN the start timestamp of the scan. |
| // For BEGIN_TXN/COMMIT_TXN/ABORT_TXN, the txn ID to operate on. |
| // For FLUSH_OPS, the txn ID to operate on. |
| // Otherwise, unused. |
| int val; |
| |
| // For INSERT, the transaction ID to insert with (kNoTxnId means none). |
| // For DIFF_SCAN, the end timestamp of the scan. |
| // Otherwise, unused. |
| int val2; |
| |
| string ToString() const { |
| switch (type) { |
| case TEST_FLUSH_TABLET: |
| case TEST_COMPACT_TABLET: |
| case TEST_FLUSH_DELTAS: |
| case TEST_MAJOR_COMPACT_DELTAS: |
| case TEST_MINOR_COMPACT_DELTAS: |
| case TEST_RESTART_TS: |
| return strings::Substitute("{$0}", TestOpType_names[type]); |
| case TEST_FLUSH_OPS: |
| case TEST_UPSERT: |
| case TEST_UPSERT_PK_ONLY: |
| case TEST_UPDATE: |
| case TEST_UPDATE_IGNORE: |
| case TEST_DELETE: |
| case TEST_DELETE_IGNORE: |
| case TEST_SCAN_AT_TIMESTAMP: |
| case TEST_BEGIN_TXN: |
| case TEST_COMMIT_TXN: |
| case TEST_ABORT_TXN: |
| return strings::Substitute("{$0, $1}", TestOpType_names[type], val); |
| case TEST_INSERT: |
| case TEST_INSERT_PK_ONLY: |
| case TEST_INSERT_IGNORE: |
| case TEST_INSERT_IGNORE_PK_ONLY: |
| case TEST_DIFF_SCAN: |
| return strings::Substitute("{$0, $1, $2}", TestOpType_names[type], val, val2); |
| default: |
| LOG(FATAL) << "Invalid op type: " << type; |
| } |
| __builtin_unreachable(); |
| } |
| }; |
| |
| enum class RedoType { |
| INSERT, |
| UPDATE, |
| DELETE, |
| }; |
| |
| struct Redo { |
| |
| Redo(RedoType t, int32_t k, optional<int32_t> v = boost::none) |
| : rtype(t), |
| key(k), |
| val(v) {} |
| |
| string ToString() const { |
| if (rtype == RedoType::DELETE) { |
| return strings::Substitute("{DELETE key=$0}", key); |
| } |
| return strings::Substitute("{$0 key=$1 val=$2}", |
| rtype == RedoType::INSERT ? "INSERT" |
| : "UPDATE", |
| key, |
| val ? std::to_string(*val) : "NULL"); |
| } |
| RedoType rtype; |
| int32_t key; |
| optional<int32_t> val; |
| }; |
| |
| const vector<TestOpType> kAllOps {TEST_INSERT, |
| TEST_INSERT_PK_ONLY, |
| TEST_INSERT_IGNORE, |
| TEST_INSERT_IGNORE_PK_ONLY, |
| TEST_UPSERT, |
| TEST_UPSERT_PK_ONLY, |
| TEST_UPDATE, |
| TEST_UPDATE_IGNORE, |
| TEST_DELETE, |
| TEST_DELETE_IGNORE, |
| TEST_FLUSH_OPS, |
| TEST_FLUSH_TABLET, |
| TEST_FLUSH_DELTAS, |
| TEST_MINOR_COMPACT_DELTAS, |
| TEST_MAJOR_COMPACT_DELTAS, |
| TEST_COMPACT_TABLET, |
| TEST_RESTART_TS, |
| TEST_SCAN_AT_TIMESTAMP, |
| TEST_DIFF_SCAN, |
| TEST_BEGIN_TXN, |
| TEST_COMMIT_TXN, |
| TEST_ABORT_TXN}; |
| |
| // Ops that focus on hammering workloads in which rows come in and out of |
| // existence. |
| const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY, |
| TEST_INSERT_IGNORE_PK_ONLY, |
| TEST_UPSERT_PK_ONLY, |
| TEST_DELETE, |
| TEST_DELETE_IGNORE, |
| TEST_FLUSH_OPS, |
| TEST_FLUSH_TABLET, |
| TEST_FLUSH_DELTAS, |
| TEST_MINOR_COMPACT_DELTAS, |
| TEST_MAJOR_COMPACT_DELTAS, |
| TEST_COMPACT_TABLET, |
| TEST_RESTART_TS, |
| TEST_SCAN_AT_TIMESTAMP, |
| TEST_DIFF_SCAN, |
| TEST_BEGIN_TXN, |
| TEST_COMMIT_TXN, |
| TEST_ABORT_TXN}; |
| |
| // Test which does only random operations against a tablet, including update and random |
| // get (ie scans with equal lower and upper bounds). |
| // |
| // The test maintains an in-memory copy of the expected state of the tablet, and uses only |
| // a single thread, so that it's easy to verify that the tablet always matches the expected |
| // state. |
| class FuzzTest : public KuduTest { |
| public: |
| FuzzTest() { |
| FLAGS_enable_maintenance_manager = false; |
| FLAGS_use_hybrid_clock = false; |
| FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps = true; |
| // The scenarios of this test do not assume using the standard control path |
| // for txn-enabled write operations. |
| FLAGS_tserver_txn_write_op_handling_enabled = false; |
| } |
| |
| void CreateTabletAndStartClusterWithSchema(const Schema& schema) { |
| schema_ = KuduSchema::FromSchema(schema); |
| KuduTest::SetUp(); |
| |
| InternalMiniClusterOptions opts; |
| cluster_.reset(new InternalMiniCluster(env_, opts)); |
| ASSERT_OK(cluster_->Start()); |
| CHECK_OK(KuduClientBuilder() |
| .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str()) |
| .default_admin_operation_timeout(MonoDelta::FromSeconds(60)) |
| .Build(&client_)); |
| // Add a table, make sure it reports itself. |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| CHECK_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(1) |
| .Create()); |
| |
| // Find the replica. |
| tablet_replica_ = LookupTabletReplica(); |
| |
| // Setup session and table. |
| session_ = client_->NewSession(); |
| CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| session_->SetTimeoutMillis(60 * 1000); |
| CHECK_OK(client_->OpenTable(kTableName, &table_)); |
| } |
| |
| void TearDown() override { |
| if (tablet_replica_) tablet_replica_.reset(); |
| if (cluster_) cluster_->Shutdown(); |
| } |
| |
| scoped_refptr<TabletReplica> LookupTabletReplica() { |
| vector<scoped_refptr<TabletReplica> > replicas; |
| cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplicas(&replicas); |
| CHECK_EQ(1, replicas.size()); |
| return replicas[0]; |
| } |
| |
| void RestartTabletServer() { |
| tablet_replica_.reset(); |
| auto ts = cluster_->mini_tablet_server(0); |
| if (ts->server()) { |
| ts->Shutdown(); |
| ASSERT_OK(ts->Restart()); |
| } else { |
| ASSERT_OK(ts->Start()); |
| } |
| ASSERT_OK(ts->server()->WaitInited()); |
| |
| tablet_replica_ = LookupTabletReplica(); |
| } |
| |
| Tablet* tablet() const { |
| return tablet_replica_->tablet(); |
| } |
| |
| Status CallParticipantOpCheckResp(int64_t txn_id, ParticipantOpPB::ParticipantOpType op_type, |
| int64_t ts_val) { |
| RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests( |
| MonoDelta::FromSeconds(10))); |
| ParticipantResponsePB resp; |
| RETURN_NOT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op_type, ts_val, &resp)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| // Adds an insert for the given key/value pair to 'ops', returning the new contents |
| // of the row. |
| ExpectedKeyValueRow InsertOrUpsertRow(int key, int val, |
| optional<ExpectedKeyValueRow> old_row, |
| TestOpType type, int txn_id) { |
| ExpectedKeyValueRow ret; |
| unique_ptr<KuduWriteOperation> op; |
| if (type == TEST_INSERT || type == TEST_INSERT_PK_ONLY) { |
| op.reset(table_->NewInsert()); |
| } else if (type == TEST_INSERT_IGNORE || type == TEST_INSERT_IGNORE_PK_ONLY) { |
| op.reset(table_->NewInsertIgnore()); |
| } else { |
| op.reset(table_->NewUpsert()); |
| } |
| KuduPartialRow* row = op->mutable_row(); |
| CHECK_OK(row->SetInt32(0, key)); |
| ret.key = key; |
| switch (type) { |
| case TEST_INSERT: |
| case TEST_INSERT_IGNORE: |
| case TEST_UPSERT: { |
| if (val & 1) { |
| CHECK_OK(row->SetNull(1)); |
| } else { |
| CHECK_OK(row->SetInt32(1, val)); |
| ret.val = val; |
| } |
| if (type == TEST_INSERT_IGNORE && old_row) { |
| // insert ignore when the row already exists results in old value |
| ret.val = old_row->val; |
| } |
| break; |
| } |
| case TEST_INSERT_PK_ONLY: |
| break; |
| case TEST_INSERT_IGNORE_PK_ONLY: |
| case TEST_UPSERT_PK_ONLY: { |
| // For "upsert PK only" and "insert ignore PK only", we expect the row |
| // to keep its old value if the row existed, or NULL if there was no old row. |
| ret.val = old_row ? old_row->val : boost::none; |
| break; |
| } |
| default: LOG(FATAL) << "Invalid test op type: " << TestOpType_names[type]; |
| } |
| if (txn_id == kNoTxnId) { |
| CHECK_OK(session_->Apply(op.release())); |
| } else { |
| CHECK_OK(FindOrDie(txn_sessions_, txn_id)->Apply(op.release())); |
| } |
| return ret; |
| } |
| |
| // Adds an update of the given key/value pair to 'ops', returning the new contents |
| // of the row. |
| ExpectedKeyValueRow MutateRow(int key, uint32_t new_val, TestOpType type) { |
| ExpectedKeyValueRow ret; |
| unique_ptr<KuduWriteOperation> op; |
| if (type == TEST_UPDATE_IGNORE) { |
| op.reset(table_->NewUpdateIgnore()); |
| } else { |
| op.reset(table_->NewUpdate()); |
| } |
| KuduPartialRow* row = op->mutable_row(); |
| CHECK_OK(row->SetInt32(0, key)); |
| ret.key = key; |
| if (new_val % 2) { |
| CHECK_OK(row->SetNull(1)); |
| } else { |
| CHECK_OK(row->SetInt32(1, new_val)); |
| ret.val = new_val; |
| } |
| CHECK_OK(session_->Apply(op.release())); |
| return ret; |
| } |
| |
| // Adds a delete of the given row to 'ops', returning boost::none (indicating that |
| // the row no longer exists). |
| optional<ExpectedKeyValueRow> DeleteRow(int key, TestOpType type) { |
| unique_ptr<KuduWriteOperation> op; |
| if (type == TEST_DELETE_IGNORE) { |
| op.reset(table_->NewDeleteIgnore()); |
| } else { |
| op.reset(table_->NewDelete()); |
| } |
| KuduPartialRow* row = op->mutable_row(); |
| CHECK_OK(row->SetInt32(0, key)); |
| CHECK_OK(session_->Apply(op.release())); |
| return boost::none; |
| } |
| |
| // Random-read the given row, returning its current value. |
| // If the row doesn't exist, returns boost::none. |
| optional<ExpectedKeyValueRow> GetRow(int key) { |
| KuduScanner s(table_.get()); |
| CHECK_OK(s.AddConjunctPredicate(table_->NewComparisonPredicate( |
| "key", KuduPredicate::EQUAL, KuduValue::FromInt(key)))); |
| CHECK_OK(s.Open()); |
| while (s.HasMoreRows()) { |
| KuduScanBatch batch; |
| CHECK_OK(s.NextBatch(&batch)); |
| for (KuduScanBatch::RowPtr row : batch) { |
| ExpectedKeyValueRow ret; |
| CHECK_OK(row.GetInt32(0, &ret.key)); |
| if (schema_.num_columns() > 1 && !row.IsNull(1)) { |
| ret.val = 0; |
| CHECK_OK(row.GetInt32(1, ret.val.get_ptr())); |
| } |
| return ret; |
| } |
| } |
| return boost::none; |
| } |
| |
| // Checks that the rows in 'found' match the state we've stored 'saved_values_' corresponding |
| // to 'timestamp'. 'errors' collects the errors found. If 'errors' is not empty it means the |
| // check failed. |
| void CheckRowsMatchAtTimestamp(int timestamp, |
| vector<ExpectedKeyValueRow> rows_found, |
| list<string>* errors) { |
| int saved_timestamp = -1; |
| auto iter = saved_values_.upper_bound(timestamp); |
| if (iter == saved_values_.end()) { |
| if (!rows_found.empty()) { |
| for (auto& found_row : rows_found) { |
| errors->push_back(Substitute("Found unexpected row: $0", found_row.ToString())); |
| } |
| } |
| } else { |
| saved_timestamp = iter->first; |
| const auto& saved = (*iter).second; |
| int found_idx = 0; |
| int expected_values_counter = 0; |
| for (auto& expected : saved) { |
| if (expected) { |
| expected_values_counter++; |
| ExpectedKeyValueRow expected_val = expected.value(); |
| if (found_idx >= rows_found.size()) { |
| errors->push_back(Substitute("Didn't find expected value: $0", |
| expected_val.ToString())); |
| break; |
| } |
| ExpectedKeyValueRow found_val = rows_found[found_idx++]; |
| if (expected_val.key != found_val.key) { |
| errors->push_back(Substitute("Mismatched key. Expected: $0 Found: $1", |
| expected_val.ToString(), found_val.ToString())); |
| continue; |
| } |
| if (expected_val.val != found_val.val) { |
| errors->push_back(Substitute("Mismatched value. Expected: $0 Found: $1", |
| expected_val.ToString(), found_val.ToString())); |
| continue; |
| } |
| } |
| } |
| if (rows_found.size() != expected_values_counter) { |
| errors->push_back(Substitute("Mismatched size. Expected: $0 rows. Found: $1 rows.", |
| expected_values_counter, rows_found.size())); |
| for (auto& found_row : rows_found) { |
| errors->push_back(Substitute("Found unexpected row: $0", found_row.ToString())); |
| } |
| } |
| } |
| if (!errors->empty()) { |
| errors->push_front(Substitute("Found errors while comparing a snapshot scan at $0 with the " |
| "values saved at $1. Errors:", |
| timestamp, saved_timestamp)); |
| } |
| } |
| |
| // Fully consume all rows in 'scanner', writing the results to 'rows'. |
| // |
| // If 'is_deleted' is provided (only in diff scans), will also write out the |
| // values of the IS_DELETED virtual column. |
| Status ScanAllRows(KuduScanner* scanner, vector<ExpectedKeyValueRow>* rows, |
| vector<bool>* is_deleted) { |
| while (scanner->HasMoreRows()) { |
| KuduScanBatch batch; |
| RETURN_NOT_OK(scanner->NextBatch(&batch)); |
| for (KuduScanBatch::RowPtr row : batch) { |
| ExpectedKeyValueRow ret; |
| RETURN_NOT_OK(row.GetInt32(0, &ret.key)); |
| if (schema_.num_columns() == 2 && !row.IsNull(1)) { |
| ret.val = 0; |
| RETURN_NOT_OK(row.GetInt32(1, ret.val.get_ptr())); |
| } |
| if (is_deleted) { |
| bool b; |
| RETURN_NOT_OK(row.IsDeleted(&b)); |
| is_deleted->push_back(b); |
| } |
| rows->emplace_back(std::move(ret)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // Scan the tablet at 'timestamp' and compare the result to the saved values. |
| void CheckScanAtTimestamp(int timestamp) { |
| KuduScanner s(table_.get()); |
| ASSERT_OK(s.SetReadMode(KuduScanner::ReadMode::READ_AT_SNAPSHOT)); |
| ASSERT_OK(s.SetSnapshotRaw(timestamp)); |
| ASSERT_OK(s.SetFaultTolerant()); |
| ASSERT_OK(s.Open()); |
| |
| vector<ExpectedKeyValueRow> found; |
| ASSERT_OK(ScanAllRows(&s, &found, nullptr)); |
| |
| list<string> errors; |
| CheckRowsMatchAtTimestamp(timestamp, std::move(found), &errors); |
| |
| string final_error; |
| if (!errors.empty()) { |
| for (const string& error : errors) { |
| final_error.append("\n" + error); |
| } |
| FAIL() << final_error; |
| } |
| } |
| |
| // Diff scan the tablet from 'start_timestamp' to 'end_timestamp' and compare |
| // the result to the saved values. |
| void CheckDiffScan(int start_timestamp, int end_timestamp) { |
| KuduScanner s(table_.get()); |
| ASSERT_OK(s.SetDiffScan(start_timestamp, end_timestamp)); |
| ASSERT_OK(s.Open()); |
| |
| vector<ExpectedKeyValueRow> found; |
| vector<bool> found_is_deleted; |
| ASSERT_OK(ScanAllRows(&s, &found, &found_is_deleted)); |
| |
| if (VLOG_IS_ON(1)) { |
| for (int i = 0; i < found.size(); i++) { |
| VLOG(1) << Substitute("Diff scan result: $0$1", found[i].ToString(), |
| found_is_deleted[i] ? " (deleted)" : ""); |
| } |
| } |
| |
| // Use saved_redos_ to reconstruct the expected results of the diff scan. |
| // |
| // 'selected_rows' tracks which row keys are expected in the scan results |
| // using the select criteria. |
| // |
| // 'expected_rows' tracks expected values of rows and is built up using the |
| // apply criteria. After we've processed all relevant deltas, rows not in |
| // 'selected_rows' will be pruned and the results compared with the diff scan. |
| // |
| // 'is_deleted_start' and 'is_deleted_end' track liveness for each row at |
| // the beginning and end of the time range. If a row is dead in both, it |
| // shouldn't be in the diff scan results. |
| vector<bool> selected_rows(FLAGS_keyspace_size); |
| vector<ExpectedKeyValueRow> expected_rows(FLAGS_keyspace_size); |
| vector<bool> is_deleted_start(FLAGS_keyspace_size, true); |
| vector<bool> is_deleted_end(FLAGS_keyspace_size, true); |
| |
| for (const auto& e : saved_redos_) { |
| int ts = e.first; |
| const auto& redos = e.second; |
| if (redos.empty()) { |
| continue; |
| } |
| VLOG(1) << "Processing redos for ts @" << ts; |
| |
| if (ts >= end_timestamp) { |
| // The redo is beyond the end of the diff scan as per both the select |
| // and apply criteria. We're iterating in ascending timestamp order so |
| // this also means all future redos are irrelevant. |
| if (VLOG_IS_ON(1)) { |
| for (const auto& redo : redos) { |
| VLOG(1) << "Skipping redo " << redo.ToString(); |
| } |
| continue; |
| } |
| break; |
| } |
| |
| for (const auto& redo : redos) { |
| VLOG(1) << "Processing redo " << redo.ToString(); |
| if (ts >= start_timestamp) { |
| // The redo is relevant as per the select criteria. |
| VLOG(1) << "Selected row " << redo.key; |
| |
| if (!selected_rows[redo.key]) { |
| // This is the first relevant redo for this row. |
| is_deleted_start[redo.key] = redo.rtype == RedoType::INSERT; |
| selected_rows[redo.key] = true; |
| } |
| } |
| |
| // The redo is relevant as per the apply criteria. |
| is_deleted_end[redo.key] = redo.rtype == RedoType::DELETE; |
| if (redo.rtype != RedoType::DELETE) { |
| // Deleted rows still exist in 'expected_rows'. This is OK; |
| // 'expected_is_deleted' will reflect the deletion. |
| expected_rows[redo.key] = { redo.key, redo.val }; |
| } |
| VLOG(1) << "New value for row " << redo.key << ": " |
| << expected_rows[redo.key].ToString(); |
| VLOG(1) << "New is_deleted for row " << redo.key << ": " |
| << is_deleted_end[redo.key]; |
| } |
| } |
| vector<bool> expected_is_deleted = is_deleted_end; |
| |
| // Trim the expected results as per 'selected_rows' and start/end liveness. |
| int row_key = 0; |
| expected_rows.erase(std::remove_if( |
| expected_rows.begin(), expected_rows.end(), |
| [&](const ExpectedKeyValueRow& /*row*/) { |
| bool retval = !selected_rows[row_key] || |
| (is_deleted_start[row_key] && is_deleted_end[row_key]); |
| row_key++; |
| return retval; |
| }), expected_rows.end()); |
| row_key = 0; |
| expected_is_deleted.erase(std::remove_if( |
| expected_is_deleted.begin(), expected_is_deleted.end(), |
| [&](bool /*is_deleted*/) { |
| bool retval = !selected_rows[row_key] || |
| (is_deleted_start[row_key] && is_deleted_end[row_key]); |
| row_key++; |
| return retval; |
| }), expected_is_deleted.end()); |
| |
| // Test the results. Note that for deleted rows, we can't compare column |
| // values; the server is free to pick whatever historical values it wants. |
| auto fail_diff_scan = [&]() { |
| FAIL() << "Diff scan verification failed\n" |
| << "Expected IS_DELETED: " << expected_is_deleted << "\n" |
| << "Found IS_DELETED: " << found_is_deleted << "\n" |
| << "Expected rows: " << expected_rows << "\n" |
| << "Found rows: " << found; |
| }; |
| if (expected_is_deleted != found_is_deleted) { |
| NO_FATALS(fail_diff_scan()); |
| } |
| if (expected_rows.size() != found.size()) { |
| NO_FATALS(fail_diff_scan()); |
| } |
| for (int i = 0; i < expected_rows.size(); i++) { |
| if ((expected_is_deleted[i] && expected_rows[i].key != found[i].key) || |
| (!expected_is_deleted[i] && expected_rows[i] != found[i])) { |
| NO_FATALS(fail_diff_scan()); |
| } |
| } |
| } |
| |
| protected: |
| // Validate that the given sequence is valid and would not cause any |
| // errors assuming that there are no bugs. For example, checks to make sure there |
| // aren't duplicate inserts with no intervening deletions. |
| // |
| // Useful when using the 'delta' test case reduction tool to allow |
| // it to skip invalid test cases. |
| static void ValidateFuzzCase(const vector<TestOp>& test_ops); |
| void RunFuzzCase(const vector<TestOp>& test_ops, |
| int update_multiplier); |
| |
| KuduSchema schema_; |
| unique_ptr<InternalMiniCluster> cluster_; |
| shared_ptr<KuduClient> client_; |
| shared_ptr<KuduSession> session_; |
| unordered_map<int, shared_ptr<KuduSession>> txn_sessions_; |
| shared_ptr<KuduTable> table_; |
| |
| map<int, |
| vector<optional<ExpectedKeyValueRow>>, |
| std::greater<int>> saved_values_; |
| |
| map<int, vector<Redo>> saved_redos_; |
| |
| scoped_refptr<TabletReplica> tablet_replica_; |
| }; |
| |
| // The set of ops to draw from. |
| enum TestOpSets { |
| ALL, // Pick an operation at random from all possible operations. |
| PK_ONLY // Pick an operation at random from the set of operations that apply only to the |
| // primary key (or that are now row-specific, like flushes or compactions). |
| }; |
| |
| TestOpType PickOpAtRandom(TestOpSets sets) { |
| switch (sets) { |
| case ALL: |
| return kAllOps[rand() % kAllOps.size()]; |
| case PK_ONLY: |
| return kPkOnlyOps[rand() % kPkOnlyOps.size()]; |
| default: |
| LOG(FATAL) << "Unknown TestOpSets type: " << sets; |
| } |
| __builtin_unreachable(); |
| } |
| |
| bool IsMutation(const TestOpType& op) { |
| switch (op) { |
| case TEST_INSERT: |
| case TEST_INSERT_PK_ONLY: |
| case TEST_INSERT_IGNORE: |
| case TEST_INSERT_IGNORE_PK_ONLY: |
| case TEST_UPSERT: |
| case TEST_UPSERT_PK_ONLY: |
| case TEST_UPDATE: |
| case TEST_UPDATE_IGNORE: |
| case TEST_DELETE: |
| case TEST_DELETE_IGNORE: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| // Generate a random valid sequence of operations for use as a fuzz test, i.e. |
| // a set of operations that, when run, will not run into any logical errors |
| // (e.g. no "key already present" or "key not found" errors). |
| // |
| // To generate this sequence, we schedule one op at a time, keeping track of |
| // what rows exist in the tablet, what row mutations are pending, what rows are |
| // being mutated by in-flight ops and transactions, etc. We only select ops |
| // that we know to be valid, using the tracked state. |
| void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) { |
| // Bitset indicating whether the given row has been committed as a part of a |
| // transaction, or successfully inserted outside of a transaction. |
| vector<bool> exists(FLAGS_keyspace_size); |
| |
| // The transaction ID that is operating on each row, or kNoTxnId if being |
| // operated on outside the context of a transaction. 'none' if the row isn't |
| // being operated on. If a row is not 'none', an entry must exist for it in |
| // 'pending_existence_per_txn'. |
| // TODO(awong): This is necessary so we don't have transactions trying to |
| // write the same rows, which currently is unprotected. Once we begin locking |
| // rows for the duration of the transaction, we shouldn't need this. |
| vector<optional<int>> txn_touching_row(FLAGS_keyspace_size); |
| |
| // Represents the pending mutations to rows that are not yet visible to other |
| // actors, and the resulting existence status of the row if we were to flush |
| // a non-transactional session (keyed as kNoTxnId) or commit a transaction |
| // (keyed by any other value). |
| // |
| // This is used to update 'exists' when scheduling a non-transactional |
| // session flush or a transaction commit. It is also used to determine |
| // whether we can operate on pending values, e.g. insert a row and then |
| // delete it in the same batched op. |
| // |
| // A row key can exist in at most one IsPresentByRowKey in any form (i.e. row |
| // 1's existence cannot be to false in two transactions), ensuring only a |
| // single actor operates on a row at a time. |
| typedef std::map<int, bool> IsPresentByRowKey; |
| unordered_map<int, IsPresentByRowKey> pending_existence_per_txn; |
| EmplaceOrDie(&pending_existence_per_txn, kNoTxnId, IsPresentByRowKey{}); |
| |
| // The transactions that have client sessions that need to be flushed. |
| unordered_set<int> txns_needing_session_flush; |
| |
| // Returns whether there are any open transactions. |
| const auto no_open_txns = [&pending_existence_per_txn] { |
| // A single entry exists for kNoTxnId. |
| return pending_existence_per_txn.size() == 1; |
| }; |
| // Helper that deterministically (based on rand()) selects a transaction ID |
| // from those in flight, or, if 'maybe_none' is true, kNoTxnId to indicate a |
| // non-transactional operation. |
| const auto pick_txn_id = [&] (bool maybe_none) -> int { |
| if (no_open_txns() || (maybe_none && rand() % 2)) { |
| // Shouldn't be called when maybe_none is false if there are no pending |
| // transactions. |
| DCHECK(maybe_none); |
| return kNoTxnId; |
| } |
| const auto& num_txns = pending_existence_per_txn.size() - 1; |
| vector<int> txn_ids; |
| txn_ids.reserve(num_txns); |
| for (const auto& txn_id_and_rows : pending_existence_per_txn) { |
| if (txn_id_and_rows.first == kNoTxnId) continue; |
| txn_ids.emplace_back(txn_id_and_rows.first); |
| } |
| std::sort(txn_ids.begin(), txn_ids.end()); |
| return txn_ids[rand() % num_txns]; |
| }; |
| |
| int next_txn_id = 0; |
| int op_timestamps = 0; |
| bool data_in_mrs = false; |
| bool worth_compacting = false; |
| bool data_in_dms = false; |
| ops->clear(); |
| while (ops->size() < len) { |
| TestOpType r = PickOpAtRandom(sets); |
| int row_key = rand() % FLAGS_keyspace_size; |
| |
| // When we perform a test mutation, we also call GetRow() which does a scan |
| // and thus increases the server's timestamp. |
| if (IsMutation(r)) { |
| op_timestamps++; |
| } |
| |
| switch (r) { |
| case TEST_INSERT: |
| case TEST_INSERT_PK_ONLY: { |
| const auto& txn_id = pick_txn_id(/*maybe_none*/true); |
| const auto& txn_operating_on_row = txn_touching_row[row_key]; |
| if (txn_operating_on_row && *txn_operating_on_row != txn_id) { |
| // The row is being operated on by another txn. |
| continue; |
| } |
| if (txn_operating_on_row && *txn_operating_on_row == txn_id && |
| FindOrDie(FindOrDie(pending_existence_per_txn, txn_id), row_key)) { |
| // The row is being operated on by this txn and the row pending |
| // state exists. |
| continue; |
| } |
| if (!txn_operating_on_row && exists[row_key]) { |
| // The row is not being operated on by another txn, but the row |
| // exists. |
| continue; |
| } |
| ops->emplace_back(r, row_key, txn_id); |
| txn_touching_row[row_key] = txn_id; |
| FindOrDie(pending_existence_per_txn, txn_id)[row_key] = true; |
| EmplaceIfNotPresent(&txns_needing_session_flush, txn_id); |
| if (txn_id == kNoTxnId) { |
| data_in_mrs = true; |
| } |
| break; |
| } |
| case TEST_INSERT_IGNORE: |
| case TEST_INSERT_IGNORE_PK_ONLY: { |
| const auto& txn_id = pick_txn_id(/*maybe_none*/true); |
| const auto& txn_operating_on_row = txn_touching_row[row_key]; |
| if (txn_operating_on_row && *txn_operating_on_row != txn_id) { |
| // The row is being operated on by another txn. |
| continue; |
| } |
| ops->emplace_back(r, row_key, txn_id); |
| txn_touching_row[row_key] = txn_id; |
| FindOrDie(pending_existence_per_txn, txn_id)[row_key] = true; |
| EmplaceIfNotPresent(&txns_needing_session_flush, txn_id); |
| if (txn_id == kNoTxnId && !exists[row_key]) { |
| data_in_mrs = true; |
| } |
| break; |
| } |
| case TEST_UPSERT: |
| case TEST_UPSERT_PK_ONLY: { |
| const auto& txn_operating_on_row = txn_touching_row[row_key]; |
| if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) { |
| // The row is being operated on by a txn. |
| continue; |
| } |
| ops->emplace_back(r, row_key); |
| txn_touching_row[row_key] = kNoTxnId; |
| auto& row_exists = FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key]; |
| if (!row_exists) { |
| data_in_mrs = true; |
| } else { |
| data_in_dms = true; |
| } |
| row_exists = true; |
| EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId); |
| break; |
| } |
| case TEST_UPDATE_IGNORE: { |
| const auto& txn_operating_on_row = txn_touching_row[row_key]; |
| if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) { |
| // The row is being operated on by another txn. |
| continue; |
| } |
| if (!txn_operating_on_row) { |
| if (exists[row_key] && !data_in_mrs) { |
| // The row is not being operated on by another txn, and it exists and |
| // has been flushed, meaning this op will result in a DMS mutation. |
| data_in_dms = true; |
| } |
| // The existence status shouldn't change. |
| FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = exists[row_key]; |
| } |
| ops->emplace_back(r, row_key); |
| txn_touching_row[row_key] = kNoTxnId; |
| EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId); |
| break; |
| } |
| case TEST_UPDATE: { |
| const auto& txn_operating_on_row = txn_touching_row[row_key]; |
| if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) { |
| // The row is being operated on by another txn. |
| continue; |
| } |
| if (txn_operating_on_row && *txn_operating_on_row == kNoTxnId && |
| !FindOrDie(FindOrDie(pending_existence_per_txn, kNoTxnId), row_key)) { |
| // The row is being operated on by an in-flight op, but the pending |
| // row state doesn't exist. |
| continue; |
| } |
| if (!txn_operating_on_row) { |
| if (!exists[row_key]) { |
| // The row is not being operated on by another txn, but the row |
| // doesn't exist so we can't update anything. |
| continue; |
| } |
| if (!data_in_mrs) { |
| // The row is not being operated on by another txn, and it exists |
| // in a DRS, meaning this op will result in a DMS mutation. |
| data_in_dms = true; |
| } |
| // The existence status shouldn't change. |
| FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = exists[row_key]; |
| } |
| ops->emplace_back(r, row_key); |
| txn_touching_row[row_key] = kNoTxnId; |
| EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId); |
| break; |
| } |
| case TEST_DELETE_IGNORE: { |
| const auto& txn_operating_on_row = txn_touching_row[row_key]; |
| if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) { |
| // The row is being operated on by another txn. |
| continue; |
| } |
| if (!txn_operating_on_row && exists[row_key] && !data_in_mrs) { |
| // The row is not being operated on by another txn, and it exists in |
| // a DRS, meaning this op will result in a DMS mutation. |
| data_in_dms = true; |
| } |
| ops->emplace_back(r, row_key); |
| FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = false; |
| txn_touching_row[row_key] = kNoTxnId; |
| EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId); |
| break; |
| } |
| case TEST_DELETE: { |
| const auto& txn_operating_on_row = txn_touching_row[row_key]; |
| if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) { |
| // The row is being operated on by a txn. Since we don't support |
| // mutating a row while it is participating in a transaction, we must |
| // wait for transaction to complete before doing anything. |
| continue; |
| } |
| if (txn_operating_on_row && *txn_operating_on_row == kNoTxnId && |
| !FindOrDie(FindOrDie(pending_existence_per_txn, kNoTxnId), row_key)) { |
| // The row is being operated on by a non-transactional in-flight op, |
| // meaning we can only correctly delete the row if the in-flight op |
| // were to insert the row, making the row's existence pending. |
| // Otherwise, we cannot schedule a delete. |
| continue; |
| } |
| if (!txn_operating_on_row && !exists[row_key]) { |
| // The row is not being operated on by another txn, but the row |
| // doesn't exist, so we cannot schedule a delete. |
| continue; |
| } |
| ops->emplace_back(TEST_DELETE, row_key); |
| FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = false; |
| txn_touching_row[row_key] = kNoTxnId; |
| EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId); |
| if (!data_in_mrs) { |
| // The row exists in a DRS, so this op will result in a DMS mutation. |
| data_in_dms = true; |
| } |
| break; |
| } |
| case TEST_FLUSH_OPS: { |
| const auto& txn_id = pick_txn_id(/*maybe_none*/true); |
| // If the picked transaction doesn't have any ops buffered in its |
| // session yet, pick a new action. |
| if (!ContainsKey(txns_needing_session_flush, txn_id)) continue; |
| if (txn_id == kNoTxnId) { |
| // If flushing rows that aren't part of any transaction, apply |
| // their state immediately. |
| auto& pending_existence_per_row = FindOrDie(pending_existence_per_txn, kNoTxnId); |
| for (const auto& key_and_exists : pending_existence_per_row) { |
| const auto& row_key = key_and_exists.first; |
| exists[row_key] = key_and_exists.second; |
| DCHECK_EQ(kNoTxnId, txn_touching_row[row_key]); |
| txn_touching_row[row_key] = boost::none; |
| } |
| pending_existence_per_row.clear(); |
| } |
| op_timestamps++; |
| txns_needing_session_flush.erase(txn_id); |
| ops->emplace_back(TEST_FLUSH_OPS, txn_id); |
| break; |
| } |
| case TEST_FLUSH_TABLET: |
| if (data_in_mrs) { |
| // Non-transactions eagerly set 'data_in_mrs', expecting a session op |
| // to be scheduled alongside the tablet flush. Transactions don't do |
| // this -- they only set 'data_in_mrs' once committed. |
| if (ContainsKey(txns_needing_session_flush, kNoTxnId)) { |
| ops->emplace_back(TEST_FLUSH_OPS); |
| txns_needing_session_flush.erase(kNoTxnId); |
| } |
| ops->emplace_back(TEST_FLUSH_TABLET); |
| data_in_mrs = false; |
| worth_compacting = true; |
| } |
| break; |
| case TEST_COMPACT_TABLET: |
| if (worth_compacting) { |
| if (ContainsKey(txns_needing_session_flush, kNoTxnId)) { |
| ops->emplace_back(TEST_FLUSH_OPS); |
| txns_needing_session_flush.erase(kNoTxnId); |
| } |
| ops->emplace_back(TEST_COMPACT_TABLET); |
| worth_compacting = false; |
| } |
| break; |
| case TEST_FLUSH_DELTAS: |
| if (data_in_dms) { |
| if (ContainsKey(txns_needing_session_flush, kNoTxnId)) { |
| ops->emplace_back(TEST_FLUSH_OPS); |
| txns_needing_session_flush.erase(kNoTxnId); |
| } |
| ops->emplace_back(TEST_FLUSH_DELTAS); |
| data_in_dms = false; |
| } |
| break; |
| case TEST_MAJOR_COMPACT_DELTAS: |
| ops->emplace_back(TEST_MAJOR_COMPACT_DELTAS); |
| break; |
| case TEST_MINOR_COMPACT_DELTAS: |
| ops->emplace_back(TEST_MINOR_COMPACT_DELTAS); |
| break; |
| case TEST_RESTART_TS: |
| ops->emplace_back(TEST_RESTART_TS); |
| break; |
| case TEST_SCAN_AT_TIMESTAMP: { |
| int timestamp = 1; |
| if (op_timestamps > 0) { |
| timestamp = (rand() % op_timestamps) + 1; |
| } |
| ops->emplace_back(TEST_SCAN_AT_TIMESTAMP, timestamp); |
| break; |
| } |
| case TEST_DIFF_SCAN: { |
| int start_timestamp = 1; |
| int end_timestamp = 1; |
| if (op_timestamps > 0) { |
| start_timestamp = (rand() % op_timestamps) + 1; |
| end_timestamp = (rand() % op_timestamps) + 1; |
| if (start_timestamp > end_timestamp) { |
| std::swap(start_timestamp, end_timestamp); |
| } |
| } |
| ops->emplace_back(TEST_DIFF_SCAN, start_timestamp, end_timestamp); |
| break; |
| } |
| case TEST_BEGIN_TXN: { |
| // If we have --max_open_txns open transactions, we can't begin a new |
| // transaction. NOTE: 'pending_existence_per_txn' also includes |
| // kNoTxnId, hence the extra count. |
| if (pending_existence_per_txn.size() == 1 + FLAGS_max_open_txns) continue; |
| const auto txn_id = next_txn_id++; |
| ops->emplace_back(r, txn_id); |
| EmplaceOrDie(&pending_existence_per_txn, txn_id, IsPresentByRowKey{}); |
| op_timestamps++; |
| break; |
| } |
| case TEST_COMMIT_TXN: { |
| if (no_open_txns()) continue; |
| const auto txn_id = pick_txn_id(/*maybe_none*/false); |
| DCHECK_NE(kNoTxnId, txn_id); |
| // If there are ops pending for this transaction, we need to flush them too. |
| if (ContainsKey(txns_needing_session_flush, txn_id)) { |
| op_timestamps++; |
| txns_needing_session_flush.erase(txn_id); |
| ops->emplace_back(TEST_FLUSH_OPS, txn_id); |
| } |
| ops->emplace_back(r, txn_id); |
| auto pending_existence = EraseKeyReturnValuePtr(&pending_existence_per_txn, txn_id); |
| for (const auto& key_and_exists : pending_existence) { |
| const auto& key = key_and_exists.first; |
| const auto& key_exists = key_and_exists.second; |
| DCHECK(key_exists); // only support inserts |
| // Since we're commiting the transaction, its MRS should hold state |
| // if there are any inserted rows. |
| data_in_mrs = true; |
| exists[key] = true; |
| txn_touching_row[key] = boost::none; |
| } |
| // Commit replicates two ops (BEGIN_COMMIT and FINALIZE_COMMIT). |
| op_timestamps += 2; |
| break; |
| } |
| case TEST_ABORT_TXN: { |
| if (no_open_txns()) continue; |
| const auto txn_id = pick_txn_id(/*maybe_none*/false); |
| DCHECK_NE(kNoTxnId, txn_id); |
| ops->emplace_back(r, txn_id); |
| auto pending_existence = EraseKeyReturnValuePtr(&pending_existence_per_txn, txn_id); |
| for (const auto& key_and_exists : pending_existence) { |
| txn_touching_row[key_and_exists.first] = boost::none; |
| } |
| op_timestamps++; |
| break; |
| } |
| default: |
| LOG(FATAL) << "Invalid op type: " << r; |
| } |
| } |
| } |
| |
| string DumpTestCase(const vector<TestOp>& ops) { |
| vector<string> strs; |
| for (TestOp test_op : ops) { |
| strs.push_back(test_op.ToString()); |
| } |
| return JoinStrings(strs, ",\n"); |
| } |
| |
| void FuzzTest::ValidateFuzzCase(const vector<TestOp>& test_ops) { |
| vector<bool> exists(FLAGS_keyspace_size); |
| unordered_map<int, vector<std::pair<int, TestOpType>>> pending_rows_per_txn; |
| for (const auto& test_op : test_ops) { |
| switch (test_op.type) { |
| case TEST_INSERT: |
| case TEST_INSERT_PK_ONLY: |
| CHECK(!exists[test_op.val]) << "invalid case: inserting already-existing row"; |
| FALLTHROUGH_INTENDED; |
| case TEST_INSERT_IGNORE: |
| case TEST_INSERT_IGNORE_PK_ONLY: { |
| const auto& txn_id = test_op.val2; |
| if (txn_id == kNoTxnId) { |
| exists[test_op.val] = true; |
| } else { |
| auto& rows = FindOrDie(pending_rows_per_txn, txn_id); |
| rows.emplace_back(make_pair(test_op.val, test_op.type)); |
| } |
| break; |
| } |
| // TODO(awong): UPSERT, UPDATE, and DELETE ops should account for |
| // 'pending_rows_per_txn' once we begin supporting transactions. |
| case TEST_UPSERT: |
| case TEST_UPSERT_PK_ONLY: |
| exists[test_op.val] = true; |
| break; |
| case TEST_UPDATE: |
| CHECK(exists[test_op.val]) << "invalid case: updating non-existing row"; |
| break; |
| case TEST_UPDATE_IGNORE: |
| // No change to `exists[test_op.val]`. |
| break; |
| case TEST_DELETE: |
| CHECK(exists[test_op.val]) << "invalid case: deleting non-existing row"; |
| exists[test_op.val] = false; |
| break; |
| case TEST_DELETE_IGNORE: |
| exists[test_op.val] = false; |
| break; |
| case TEST_BEGIN_TXN: |
| EmplaceOrDie(&pending_rows_per_txn, test_op.val, vector<pair<int, TestOpType>>()); |
| break; |
| case TEST_COMMIT_TXN: { |
| auto rows_and_ops = EraseKeyReturnValuePtr(&pending_rows_per_txn, test_op.val); |
| for (const auto& row_and_op : rows_and_ops) { |
| const auto& row = row_and_op.first; |
| const auto& op = row_and_op.second; |
| switch (op) { |
| case TEST_INSERT: |
| case TEST_INSERT_PK_ONLY: |
| CHECK(!exists[row]); |
| FALLTHROUGH_INTENDED; |
| case TEST_INSERT_IGNORE: |
| case TEST_INSERT_IGNORE_PK_ONLY: |
| exists[row] = true; |
| break; |
| default: |
| LOG(DFATAL) << "transactions only support insert operations"; |
| } |
| } |
| break; |
| } |
| case TEST_ABORT_TXN: { |
| // Ensure that the rows this transaction was operating on were valid. |
| auto rows_and_ops = EraseKeyReturnValuePtr(&pending_rows_per_txn, test_op.val); |
| for (const auto& row_and_op : rows_and_ops) { |
| const auto& row = row_and_op.first; |
| const auto& op = row_and_op.second; |
| switch (op) { |
| case TEST_INSERT: |
| case TEST_INSERT_PK_ONLY: |
| CHECK(!exists[row]); |
| break; |
| case TEST_INSERT_IGNORE: |
| case TEST_INSERT_IGNORE_PK_ONLY: |
| break; |
| default: |
| LOG(DFATAL) << "transactions only support insert operations"; |
| } |
| } |
| break; |
| } |
| default: |
| break; |
| } |
| } |
| } |
| |
| void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops, |
| int update_multiplier = 1) { |
| ValidateFuzzCase(test_ops); |
| // Dump the test case, since we usually run a random one. |
| // This dump format is easy for a developer to copy-paste back |
| // into a test method in order to reproduce a failure. |
| LOG(INFO) << "test case:\n" << DumpTestCase(test_ops); |
| |
| // Represent the expected state of the table if we were to flush ops. |
| vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size); |
| |
| // The ops that are current pending in a session (not flushed yet). |
| typedef unordered_map<int, optional<ExpectedKeyValueRow>> ValueByRowKey; |
| unordered_map<int, ValueByRowKey> pending_vals_per_txn; |
| EmplaceOrDie(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{}); |
| |
| // We keep track of the redos too so it's easier to piece together the |
| // expected results of a diff scan. |
| unordered_map<int, vector<Redo>> pending_redos_per_txn; |
| EmplaceOrDie(&pending_redos_per_txn, kNoTxnId, vector<Redo>{}); |
| |
| // Returns the latest value for the given 'row_key' that is pending for the |
| // given transaction. If no mutations are pending for the 'row_key' in the |
| // given transaction, returns the latest committed value. |
| const auto pending_row_by_key_for_txn = [&] (int row_key, int txn_id) { |
| auto* pending_row_by_key = FindOrNull(pending_vals_per_txn, txn_id); |
| if (pending_row_by_key && ContainsKey(*pending_row_by_key, row_key)) { |
| return (*pending_row_by_key)[row_key]; |
| } |
| return cur_val[row_key]; |
| }; |
| |
| int i = 0; |
| for (const TestOp& test_op : test_ops) { |
| LOG(INFO) << "Running op " << test_op.ToString(); |
| if (IsMutation(test_op.type)) { |
| EXPECT_EQ(cur_val[test_op.val], GetRow(test_op.val)); |
| } |
| |
| switch (test_op.type) { |
| case TEST_INSERT: |
| case TEST_INSERT_PK_ONLY: |
| case TEST_INSERT_IGNORE: |
| case TEST_INSERT_IGNORE_PK_ONLY: |
| case TEST_UPSERT: |
| case TEST_UPSERT_PK_ONLY: { |
| const auto& row_key = test_op.val; |
| const auto& txn_id = test_op.val2; |
| const auto& old_row = pending_row_by_key_for_txn(row_key, txn_id); |
| RedoType rtype = old_row ? RedoType::UPDATE : RedoType::INSERT; |
| auto pending_row = InsertOrUpsertRow( |
| row_key, i++, old_row, test_op.type, txn_id); |
| |
| auto& pending_row_by_key = LookupOrEmplace(&pending_vals_per_txn, txn_id, ValueByRowKey{}); |
| EmplaceOrUpdate(&pending_row_by_key, row_key, pending_row); |
| |
| // An insert ignore on a row that already exists will be dropped server-side. |
| // An "upsert PK-only" that is converted into an update will be dropped server-side. |
| // We must do the same. |
| if ((test_op.type == TEST_INSERT_IGNORE || |
| test_op.type == TEST_INSERT_IGNORE_PK_ONLY || |
| test_op.type == TEST_UPSERT_PK_ONLY) && |
| rtype == RedoType::UPDATE) { |
| break; |
| } |
| |
| // There will actually be an effect on the server-side state so keep |
| // track of the change. |
| FindOrDie(pending_redos_per_txn, txn_id).emplace_back(rtype, row_key, pending_row.val); |
| break; |
| } |
| case TEST_UPDATE: |
| case TEST_UPDATE_IGNORE: { |
| const auto& row_key = test_op.val; |
| if (test_op.type == TEST_UPDATE_IGNORE && !pending_row_by_key_for_txn(row_key, kNoTxnId)) { |
| // Still call MutateRow to apply the UPDATE_IGNORE operations to the session. |
| // However don't adjust the pending values given the operation will be ignored. |
| for (int j = 0; j < update_multiplier; j++) { |
| MutateRow(row_key, i++, test_op.type); |
| } |
| break; |
| } |
| ExpectedKeyValueRow latest_update; |
| for (int j = 0; j < update_multiplier; j++) { |
| latest_update = MutateRow(row_key, i++, test_op.type); |
| } |
| FindOrDie(pending_redos_per_txn, kNoTxnId).emplace_back( |
| RedoType::UPDATE, row_key, latest_update.val); |
| auto& pending_row_by_key = |
| LookupOrEmplace(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{}); |
| EmplaceOrUpdate(&pending_row_by_key, row_key, latest_update); |
| break; |
| } |
| case TEST_DELETE: |
| case TEST_DELETE_IGNORE: { |
| const auto& row_key = test_op.val; |
| DeleteRow(test_op.val, test_op.type); |
| if (test_op.type == TEST_DELETE_IGNORE && !pending_row_by_key_for_txn(row_key, kNoTxnId)) { |
| // Still call DeleteRow to apply the DELETE_IGNORE operation to the session. |
| // However don't adjust the pending values given the operation will be ignored. |
| break; |
| } |
| FindOrDie(pending_redos_per_txn, kNoTxnId).emplace_back( |
| RedoType::DELETE, row_key, boost::none); |
| auto& pending_row_by_key = |
| LookupOrEmplace(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{}); |
| EmplaceOrUpdate(&pending_row_by_key, row_key, boost::none); |
| break; |
| } |
| case TEST_FLUSH_OPS: { |
| const auto& txn_id = test_op.val; |
| auto session = txn_id == kNoTxnId ? session_ : FindOrDie(txn_sessions_, txn_id); |
| FlushSessionOrDie(session); |
| // Only update the saved and pending values if the flush is _not_ part |
| // of a transaction. Transactional mutations should only take effect |
| // once committed. |
| if (txn_id == kNoTxnId) { |
| int current_time = down_cast<kudu::clock::LogicalClock*>( |
| tablet()->clock())->GetCurrentTime(); |
| VLOG(1) << "Current time: " << current_time; |
| |
| auto& pending_vals_no_txn = FindOrDie(pending_vals_per_txn, kNoTxnId); |
| for (const auto& kv : pending_vals_no_txn) { |
| cur_val[kv.first] = kv.second; |
| } |
| pending_vals_no_txn.clear(); |
| saved_values_[current_time] = cur_val; |
| |
| auto& pending_redos_no_txn = FindOrDie(pending_redos_per_txn, kNoTxnId); |
| saved_redos_[current_time] = pending_redos_no_txn; |
| pending_redos_no_txn.clear(); |
| } |
| break; |
| } |
| case TEST_FLUSH_TABLET: |
| ASSERT_OK(tablet()->Flush()); |
| break; |
| case TEST_FLUSH_DELTAS: |
| ASSERT_OK(tablet()->FlushBiggestDMS()); |
| break; |
| case TEST_MAJOR_COMPACT_DELTAS: |
| ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION)); |
| break; |
| case TEST_MINOR_COMPACT_DELTAS: |
| ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION)); |
| break; |
| case TEST_COMPACT_TABLET: |
| ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| break; |
| case TEST_RESTART_TS: |
| NO_FATALS(RestartTabletServer()); |
| break; |
| case TEST_SCAN_AT_TIMESTAMP: |
| NO_FATALS(CheckScanAtTimestamp(test_op.val)); |
| break; |
| case TEST_DIFF_SCAN: |
| NO_FATALS(CheckDiffScan(test_op.val, test_op.val2)); |
| break; |
| case TEST_BEGIN_TXN: { |
| const auto& txn_id = test_op.val; |
| shared_ptr<KuduSession> s(new KuduSession(client_, TxnId(txn_id))); |
| s->data_->Init(s); |
| ASSERT_OK(s->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| s->SetTimeoutMillis(60 * 1000); |
| EmplaceOrDie(&txn_sessions_, txn_id, std::move(s)); |
| EmplaceOrDie(&pending_vals_per_txn, txn_id, ValueByRowKey{}); |
| EmplaceOrDie(&pending_redos_per_txn, txn_id, vector<Redo>{}); |
| ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_TXN, -1)); |
| break; |
| } |
| case TEST_COMMIT_TXN: { |
| const auto& txn_id = test_op.val; |
| // Before committing, flush all the rows we have pending for this transaction. |
| FlushSessionOrDie(FindOrDie(txn_sessions_, txn_id)); |
| |
| ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_COMMIT, -1)); |
| int current_time = down_cast<kudu::clock::LogicalClock*>( |
| tablet()->clock())->GetCurrentTime(); |
| ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::FINALIZE_COMMIT, |
| current_time)); |
| VLOG(1) << "Current time: " << current_time; |
| auto txn_pending_vals = EraseKeyReturnValuePtr(&pending_vals_per_txn, txn_id); |
| for (const auto& kv : txn_pending_vals) { |
| cur_val[kv.first] = kv.second; |
| } |
| saved_values_[current_time] = cur_val; |
| |
| auto txn_pending_redos = EraseKeyReturnValuePtr(&pending_redos_per_txn, txn_id); |
| saved_redos_[current_time] = txn_pending_redos; |
| |
| txn_sessions_.erase(txn_id); |
| break; |
| } |
| case TEST_ABORT_TXN: { |
| const auto& txn_id = test_op.val; |
| ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::ABORT_TXN, -1)); |
| txn_sessions_.erase(txn_id); |
| break; |
| } |
| default: |
| LOG(FATAL) << test_op.type; |
| } |
| } |
| } |
| |
| // Generates a random test sequence and runs it. |
| // The logs of this test are designed to easily be copy-pasted and create |
| // more specific test cases like TestFuzz<N> below. |
| TEST_F(FuzzTest, TestRandomFuzzPksOnly) { |
| CreateTabletAndStartClusterWithSchema(Schema({ColumnSchema("key", INT32)}, 1)); |
| SeedRandom(); |
| vector<TestOp> test_ops; |
| GenerateTestCase(&test_ops, AllowSlowTests() ? 1000 : 50, TestOpSets::PK_ONLY); |
| RunFuzzCase(test_ops); |
| } |
| |
| // Generates a random test sequence and runs it. |
| // The logs of this test are designed to easily be copy-pasted and create |
| // more specific test cases like TestFuzz<N> below. |
| TEST_F(FuzzTest, TestRandomFuzz) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| SeedRandom(); |
| vector<TestOp> test_ops; |
| GenerateTestCase(&test_ops, AllowSlowTests() ? 1000 : 50); |
| RunFuzzCase(test_ops); |
| } |
| |
| // Generates a random test case, but the UPDATEs are all repeated many times. |
| // This results in very large batches which are likely to span multiple delta blocks |
| // when flushed. |
| TEST_F(FuzzTest, TestRandomFuzzHugeBatches) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| SeedRandom(); |
| vector<TestOp> test_ops; |
| GenerateTestCase(&test_ops, AllowSlowTests() ? 500 : 50); |
| int update_multiplier; |
| #ifdef THREAD_SANITIZER |
| // TSAN builds run more slowly, so 500 can cause timeouts. |
| update_multiplier = 100; |
| #else |
| update_multiplier = 500; |
| #endif |
| RunFuzzCase(test_ops, update_multiplier); |
| } |
| |
| TEST_F(FuzzTest, TestFuzz1) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| vector<TestOp> test_ops = { |
| // Get an inserted row in a DRS. |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| |
| // DELETE in DMS, INSERT in MRS and flush again. |
| {TEST_DELETE, 0}, |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| |
| // State: |
| // RowSet RowSet(0): |
| // (int32 key=1, int32 val=NULL) Undos: [@1(DELETE)] Redos (in DMS): [@2 DELETE] |
| // RowSet RowSet(1): |
| // (int32 key=1, int32 val=NULL) Undos: [@2(DELETE)] Redos: [] |
| |
| {TEST_COMPACT_TABLET}, |
| }; |
| RunFuzzCase(test_ops); |
| } |
| |
| // A particular test case which previously failed TestFuzz. |
| TEST_F(FuzzTest, TestFuzz2) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| vector<TestOp> test_ops = { |
| {TEST_INSERT, 0}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| // (int32 key=1, int32 val=NULL) |
| // Undo Mutations: [@1(DELETE)] |
| // Redo Mutations: [@1(DELETE)] |
| |
| {TEST_INSERT, 0}, |
| {TEST_DELETE, 0}, |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| // (int32 key=1, int32 val=NULL) |
| // Undo Mutations: [@2(DELETE)] |
| // Redo Mutations: [] |
| |
| {TEST_COMPACT_TABLET}, |
| // Output Row: (int32 key=1, int32 val=NULL) |
| // Undo Mutations: [@1(DELETE)] |
| // Redo Mutations: [@1(DELETE)] |
| |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_COMPACT_TABLET}, |
| }; |
| RunFuzzCase(test_ops); |
| } |
| |
| // A particular test case which previously failed TestFuzz. |
| TEST_F(FuzzTest, TestFuzz3) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| vector<TestOp> test_ops = { |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| // Output Row: (int32 key=1, int32 val=NULL) |
| // Undo Mutations: [@1(DELETE)] |
| // Redo Mutations: [] |
| |
| {TEST_DELETE, 0}, |
| // Adds a @2 DELETE to DMS for above row. |
| |
| {TEST_INSERT, 0}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| // (int32 key=1, int32 val=NULL) |
| // Undo Mutations: [@2(DELETE)] |
| // Redo Mutations: [@2(DELETE)] |
| |
| // Compaction input: |
| // Row 1: (int32 key=1, int32 val=NULL) |
| // Undo Mutations: [@2(DELETE)] |
| // Redo Mutations: [@2(DELETE)] |
| // Row 2: (int32 key=1, int32 val=NULL) |
| // Undo Mutations: [@1(DELETE)] |
| // Redo Mutations: [@2(DELETE)] |
| |
| {TEST_COMPACT_TABLET}, |
| }; |
| RunFuzzCase(test_ops); |
| } |
| |
| // A particular test case which previously failed TestFuzz. |
| TEST_F(FuzzTest, TestFuzz4) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| vector<TestOp> test_ops = { |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_COMPACT_TABLET}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_COMPACT_TABLET}, |
| {TEST_INSERT, 0}, |
| {TEST_UPDATE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_DELETE, 0}, |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_UPDATE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_UPDATE, 0}, |
| {TEST_DELETE, 0}, |
| {TEST_INSERT, 0}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_COMPACT_TABLET}, |
| }; |
| RunFuzzCase(test_ops); |
| } |
| |
| |
| TEST_F(FuzzTest, TestFuzz5) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| vector<TestOp> test_ops = { |
| {TEST_UPSERT_PK_ONLY, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_INSERT, 0}, |
| {TEST_SCAN_AT_TIMESTAMP, 5}, |
| }; |
| RunFuzzCase(test_ops); |
| } |
| |
| // Previously caused incorrect data being read after restart. |
| // Failure: |
| // Value of: val_in_table |
| // Actual: "()" |
| // Expected: "(" + cur_val + ")" |
| TEST_F(FuzzTest, TestFuzzWithRestarts1) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_UPDATE, 1}, |
| {TEST_RESTART_TS}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_DELTAS}, |
| {TEST_INSERT, 0}, |
| {TEST_DELETE, 1}, |
| {TEST_INSERT, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_RESTART_TS}, |
| {TEST_MINOR_COMPACT_DELTAS}, |
| {TEST_COMPACT_TABLET}, |
| {TEST_UPDATE, 1}, |
| {TEST_FLUSH_OPS} |
| }); |
| } |
| |
| // Previously caused KUDU-1341: |
| // deltafile.cc:134] Check failed: last_key_.CompareTo<UNDO>(key) <= 0 must |
| // insert undo deltas in sorted order (ascending key, then descending ts): |
| // got key (row 1@ts5965182714017464320) after (row 1@ts5965182713875046400) |
| TEST_F(FuzzTest, TestFuzzWithRestarts2) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_DELTAS}, |
| {TEST_RESTART_TS}, |
| {TEST_INSERT, 1}, |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_DELETE, 0}, |
| {TEST_INSERT, 0}, |
| {TEST_UPDATE, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_FLUSH_DELTAS}, |
| {TEST_RESTART_TS}, |
| {TEST_UPDATE, 1}, |
| {TEST_DELETE, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_RESTART_TS}, |
| {TEST_INSERT, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_RESTART_TS}, |
| {TEST_COMPACT_TABLET} |
| }); |
| } |
| |
| // Regression test for KUDU-1467: a sequence involving |
| // UPSERT which failed to replay properly upon bootstrap. |
| TEST_F(FuzzTest, TestUpsertSeq) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 1}, |
| {TEST_UPSERT, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_UPSERT, 1}, |
| {TEST_DELETE, 1}, |
| {TEST_UPSERT, 1}, |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_RESTART_TS}, |
| {TEST_UPDATE, 1}, |
| }); |
| } |
| |
| // Regression test for KUDU-1623: updates without primary key |
| // columns specified can cause crashes and issues at restart. |
| TEST_F(FuzzTest, TestUpsert_PKOnlyOps) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_UPSERT_PK_ONLY, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_RESTART_TS} |
| }); |
| } |
| |
| // Regression test for KUDU-1905: reinserts to a tablet that has |
| // only primary keys end up as empty change lists. We were previously |
| // crashing when a changelist was empty. |
| TEST_F(FuzzTest, TestUpsert_PKOnlySchema) { |
| CreateTabletAndStartClusterWithSchema(Schema({ColumnSchema("key", INT32)}, 1)); |
| RunFuzzCase({ |
| {TEST_UPSERT_PK_ONLY, 1}, |
| {TEST_DELETE, 1}, |
| {TEST_UPSERT_PK_ONLY, 1}, |
| {TEST_UPSERT_PK_ONLY, 1}, |
| {TEST_FLUSH_OPS} |
| }); |
| } |
| |
| // MRS test for KUDU-2809: a row that has been inserted and deleted within the |
| // time range of a diff scan is excluded from the results. |
| TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanMRS) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_DIFF_SCAN, 4, 7} |
| }); |
| } |
| |
| // DRS test for KUDU-2809: a row that has been inserted and deleted within the |
| // time range of a diff scan is excluded from the results. |
| TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanDRS) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_DIFF_SCAN, 4, 7} |
| }); |
| } |
| TEST_F(FuzzTest, TestReplayDeletesOnTxnRowsets) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| // Insert to the main MRS. |
| {TEST_INSERT_PK_ONLY, 1, -1}, |
| {TEST_FLUSH_OPS, -1}, |
| {TEST_FLUSH_TABLET}, |
| |
| // Insert into a transactional MRS. |
| {TEST_BEGIN_TXN, 2}, |
| {TEST_INSERT_IGNORE_PK_ONLY, 0, 2}, |
| {TEST_FLUSH_OPS, 2}, |
| {TEST_COMMIT_TXN, 2}, |
| |
| // Delete the rows in both MRSs. |
| {TEST_DELETE, 0}, |
| {TEST_DELETE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // We should be able to restart without issues. |
| {TEST_FLUSH_DELTAS}, |
| {TEST_RESTART_TS}, |
| }); |
| } |
| |
| TEST_F(FuzzTest, TestFlushMRSsWithInvisibleRows) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| // Insert into a transactional MRS. |
| {TEST_BEGIN_TXN, 0}, |
| {TEST_INSERT_IGNORE, 1, 0}, |
| {TEST_FLUSH_OPS, 0}, |
| {TEST_COMMIT_TXN, 0}, |
| |
| // Insert into the main MRS and it in the same batch. |
| {TEST_INSERT_PK_ONLY, 0, -1}, |
| {TEST_INSERT_IGNORE_PK_ONLY, 0, -1}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| {TEST_RESTART_TS}, |
| {TEST_MAJOR_COMPACT_DELTAS}, |
| |
| // Delete the row in the transactional MRS. |
| {TEST_DELETE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Flush the tablet, merging the MRSs, one of which has an invisible row. |
| {TEST_FLUSH_TABLET}, |
| }); |
| } |
| |
| // Test that when the newer row's redo head and older row's redo tail have the |
| // same timestamp, that we transfer the newest non-deletes onto the newer row, |
| // since if a row is alive, it must be alive as the newer row. |
| TEST_F(FuzzTest, TestOlderRowsetGetsReinsertWhenNewerGetsDeleteInSameOp) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| // Insert and delete a row from the main MRS. |
| {TEST_INSERT, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| {TEST_DELETE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Insert into a transactional MRS. |
| {TEST_BEGIN_TXN, 25}, |
| {TEST_INSERT_IGNORE, 1, 25}, |
| {TEST_FLUSH_OPS, 25}, |
| {TEST_COMMIT_TXN, 25}, |
| |
| // Delete from the transactional MRS and insert back to the main MRS. |
| {TEST_DELETE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| {TEST_INSERT, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Merging the history of this row, the inputs are: |
| // tablet MRS: UNDO(del@t1) <- BASE -> REDO(del@t2) -> REDO(reins@t4) |
| // txn MRS: UNDO(del@t3) <- BASE -> REDO(del@t4) |
| // |
| // A contiguous history should be generated by transferring the last REDO |
| // in the tablet MRS onto the txn MRS. |
| {TEST_FLUSH_TABLET}, |
| }); |
| } |
| |
| // Regression test for KUDU-3108. Previously caused us to have divergent 'hot' |
| // and 'hotmaxes' containers in the merge iterator, causing us to read invalid |
| // state and crash. |
| TEST_F(FuzzTest, Kudu3108) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_INSERT, 3}, |
| {TEST_DELETE, 1}, |
| {TEST_FLUSH_OPS}, |
| {TEST_FLUSH_TABLET}, |
| {TEST_INSERT, 1}, |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS}, |
| {TEST_DIFF_SCAN, 5, 12}, |
| }); |
| } |
| |
| // Test that when a strictly older rowset whose row has been deleted gets |
| // reinserted to after already inserting and deleting from a newer rowset. |
| TEST_F(FuzzTest, TestOlderRowsetGetsNewerInsertAndDelete) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| // Insert, delete, and insert again into the main MRS. |
| {TEST_INSERT, 1}, |
| {TEST_INSERT, 0}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| {TEST_UPDATE, 0}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Delete both rows from the main MRS. |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Insert into a transactional MRS. |
| {TEST_BEGIN_TXN, 25}, |
| {TEST_INSERT_IGNORE_PK_ONLY, 0, 25}, |
| {TEST_FLUSH_OPS, 25}, |
| {TEST_COMMIT_TXN, 25}, |
| |
| // Delete from the transactional MRS and insert to the main MRS. |
| {TEST_DELETE, 0}, |
| {TEST_INSERT, 0}, |
| {TEST_UPDATE, 0}, |
| {TEST_DELETE, 0}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Merging the history of this row, the inputs are: |
| // tablet MRS: UNDO(del@t1) <- BASE -> REDO(upd@t2) -> REDO(del@t3) -> REDO(reins@t5) |
| // -> REDO(upd@t5) -> REDO(del@t5) |
| // txn MRS: UNDO(del@t4) <- BASE -> REDO(del@t5) |
| // where: t1=5, t2=7, t3=9, t4=13, t4=19 |
| {TEST_FLUSH_TABLET}, |
| |
| // Verify correctness by scanning at the boundaries of the mutations. |
| {TEST_SCAN_AT_TIMESTAMP, 5}, |
| {TEST_SCAN_AT_TIMESTAMP, 6}, |
| {TEST_SCAN_AT_TIMESTAMP, 7}, |
| {TEST_SCAN_AT_TIMESTAMP, 8}, |
| {TEST_SCAN_AT_TIMESTAMP, 9}, |
| {TEST_SCAN_AT_TIMESTAMP, 10}, |
| {TEST_SCAN_AT_TIMESTAMP, 13}, |
| {TEST_SCAN_AT_TIMESTAMP, 14}, |
| {TEST_SCAN_AT_TIMESTAMP, 19}, |
| {TEST_SCAN_AT_TIMESTAMP, 20}, |
| }); |
| } |
| |
| // Test when the REDO histories are tangential to one another but don't |
| // overlap. |
| TEST_F(FuzzTest, TestMergeTangentialNonOverlappingRedoHistory) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_BEGIN_TXN, 25}, |
| {TEST_INSERT, 1, 25}, |
| {TEST_FLUSH_OPS, 25}, |
| {TEST_COMMIT_TXN, 25}, |
| |
| // Have some REDOs land in the transactional MRS. |
| {TEST_UPDATE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| {TEST_UPDATE, 1}, |
| {TEST_DELETE, 1}, |
| |
| // Have another insert land in the main MRS, and give it an update at the |
| // same timestamp. |
| {TEST_INSERT, 1}, |
| {TEST_DELETE, 1}, |
| {TEST_INSERT, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Finally, delete the row at another timestamp. |
| {TEST_DELETE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Merging the history of this row, the inputs are: |
| // txn MRS: UNDO(del@t1) <- BASE -> REDO(upd@t2) -> REDO(upd@t3) -> REDO(del@t3) |
| // tablet MRS: UNDO(del@t3) <- BASE -> REDO(del@t3) -> REDO(reins@t3) -> REDO(del@t4) |
| {TEST_FLUSH_TABLET}, |
| }); |
| } |
| |
| // Test that when we delete from a transactional MRS and insert to the main MRS |
| // at the same time, the generated row history is correct. |
| TEST_F(FuzzTest, TestRedoHistoryInterleavingRowsets) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 0}, |
| {TEST_BEGIN_TXN, 25}, |
| {TEST_INSERT_IGNORE, 1, 25}, |
| {TEST_FLUSH_OPS, 25}, |
| {TEST_COMMIT_TXN, 25}, |
| |
| // Have some REDOs land in the transactional MRS. |
| {TEST_UPDATE, 1}, |
| {TEST_DELETE, 1}, |
| |
| // Have another insert land in the main MRS, and give it an update at the |
| // same timestamp. |
| {TEST_INSERT, 1}, |
| {TEST_UPDATE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Finally, delete the row at another timestamp. |
| {TEST_DELETE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // We should end up with two compaction inputs: |
| // txn MRS: UNDO(del@t1) <- BASE -> REDO(upd@t2) -> REDO(del@t2) |
| // tablet MRS: UNDO(del@t2) <- BASE -> REDO(upd@t2) -> REDO(del@t3) |
| // where t1 < t2 < t3 |
| // |
| // These should flush to: |
| // UNDO(del@t1) <- UNDO(upd@t2) <- UNDO(reins@t2) \ |
| // <- UNDO(del@t2) <- UNDO(upd@t2) <- BASE -> REDO(del@t3) |
| // |
| // Where: t1 = 6, t2 = 12, t3 = 14 |
| {TEST_FLUSH_TABLET}, |
| |
| // Run scans to validate we have the correct values in each range. |
| {TEST_SCAN_AT_TIMESTAMP, 6}, |
| {TEST_SCAN_AT_TIMESTAMP, 7}, |
| {TEST_SCAN_AT_TIMESTAMP, 12}, |
| {TEST_SCAN_AT_TIMESTAMP, 13}, |
| {TEST_SCAN_AT_TIMESTAMP, 14}, |
| {TEST_SCAN_AT_TIMESTAMP, 15}, |
| }); |
| } |
| |
| TEST_F(FuzzTest, TestDontTransferUpdateWithSameTimestampAsDelete) { |
| CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema()); |
| RunFuzzCase({ |
| {TEST_INSERT, 0}, |
| {TEST_BEGIN_TXN, 25}, |
| {TEST_INSERT, 1, 25}, |
| {TEST_FLUSH_OPS, 25}, |
| {TEST_COMMIT_TXN, 25}, |
| |
| // Have a REDO land in the txn metadata to disambiguate which rowset input |
| // is older. |
| {TEST_UPDATE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // Now delete the row and insert to a different rowset (the main MRS). Give |
| // it an update to make it somewhat ambiguous whether there's overlap, |
| // since both input rows end at the same timestamp. |
| {TEST_DELETE, 1}, |
| {TEST_INSERT, 1}, |
| {TEST_UPDATE, 1}, |
| {TEST_FLUSH_OPS, -1}, |
| |
| // We should end up with two compaction input rows: |
| // txn MRS: UNDO(del@t1) <- BASE -> REDO(upd@t2) -> REDO(del@t3) |
| // tablet MRS: UNDO(del@t3) <- BASE -> REDO(upd@t3) |
| // where t1 < t2 < t3 |
| // |
| // A flush should correctly determine that we don't need to transfer the |
| // REDO(upd@t3) to the txn MRS input row, which would be catastrophic since |
| // we'd be left with a delete followed by an update with no reinsert in the |
| // txn MRS's input row. |
| {TEST_FLUSH_TABLET}, |
| }); |
| } |
| |
| } // namespace tablet |
| } // namespace kudu |
| |
| // Has to be defined outside of any namespace. |
| MAKE_ENUM_LIMITS(kudu::tablet::TestOpType, |
| kudu::tablet::TEST_INSERT, |
| kudu::tablet::TEST_NUM_OP_TYPES); |