| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdint> |
| #include <cstdlib> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <type_traits> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/column_predicate.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/row_operations.pb.h" |
| #include "kudu/common/rowblock.h" |
| #include "kudu/common/rowblock_memory.h" |
| #include "kudu/common/scan_spec.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/tablet/key_value_test_schema.h" |
| #include "kudu/tablet/local_tablet_writer.h" |
| #include "kudu/tablet/rowset.h" |
| #include "kudu/tablet/tablet-test-util.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DEFINE_int32(keyspace_size, 3000, "number of unique row keys to insert/mutate"); |
| DEFINE_int32(runtime_seconds, 1, "number of seconds to run the test"); |
| DEFINE_int32(sleep_between_background_ops_ms, 100, |
| "number of milliseconds to sleep between flushing or compacting"); |
| DEFINE_int32(update_delete_ratio, 4, "ratio of update:delete when mutating existing rows"); |
| |
| DECLARE_int32(deltafile_default_block_size); |
| |
| using std::nullopt; |
| using std::optional; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| |
| namespace kudu { |
| namespace tablet { |
| |
| // Test which does only random operations against a tablet, including update and random |
| // get (ie scans with equal lower and upper bounds). |
| // |
| // The test maintains an in-memory copy of the expected state of the tablet, and uses only |
| // a single thread, so that it's easy to verify that the tablet always matches the expected |
| // state. |
| class TestRandomAccess : public KuduTabletTest { |
| public: |
| TestRandomAccess() |
| : KuduTabletTest(CreateKeyValueTestSchema()), |
| done_(1) { |
| OverrideFlagForSlowTests("keyspace_size", "30000"); |
| OverrideFlagForSlowTests("runtime_seconds", "10"); |
| OverrideFlagForSlowTests("sleep_between_background_ops_ms", "1000"); |
| |
| // Set a small block size to increase chances that a single update will span |
| // multiple delta blocks. |
| FLAGS_deltafile_default_block_size = 1024; |
| expected_tablet_state_.resize(FLAGS_keyspace_size); |
| } |
| |
| virtual void SetUp() OVERRIDE { |
| KuduTabletTest::SetUp(); |
| writer_.reset(new LocalTabletWriter(tablet().get(), &client_schema_)); |
| SeedRandom(); |
| } |
| |
| // Pick a random row of the table, verify its current state, and then |
| // modify it in some way. The modifications may include multiple mutations |
| // to the same row in a single batch (eg insert/update/delete). |
| // |
| // The mutations are always valid. For example: |
| // - inserting if it doesn't exist yet |
| // - perform an update or delete the row if it does exist. |
| // |
| // TODO: should add a version of this test which also tries invalid operations |
| // and validates the correct errors. |
| void DoRandomBatch() { |
| int key = rand() % expected_tablet_state_.size(); |
| optional<ExpectedKeyValueRow>& cur_val = expected_tablet_state_[key]; |
| |
| // Check that a read yields what we expect. |
| optional<ExpectedKeyValueRow> val_in_table = GetRow(key); |
| ASSERT_EQ(cur_val, val_in_table); |
| |
| vector<LocalTabletWriter::RowOp> pending; |
| for (int i = 0; i < 3; i++) { |
| int new_val = rand(); |
| if (!cur_val) { |
| // If there is no row, then randomly insert, insert ignore, |
| // update ignore, delete ignore, or upsert. |
| switch (rand() % 6) { |
| case 1: |
| cur_val = InsertRow(key, new_val, &pending); |
| break; |
| case 2: |
| cur_val = InsertIgnoreRow(key, new_val, &pending); |
| break; |
| case 3: |
| UpdateIgnoreRow(key, new_val, cur_val, &pending); // won't change current value |
| break; |
| case 4: |
| DeleteIgnoreRow(key, &pending); // won't change current value |
| break; |
| case 5: |
| cur_val = UpsertIgnoreRow(key, new_val, cur_val, &pending); |
| break; |
| default: |
| cur_val = UpsertRow(key, new_val, cur_val, &pending); |
| } |
| } else { |
| if (new_val % (FLAGS_update_delete_ratio + 1) == 0) { |
| // Randomly choose between delete and delete ignore. |
| if (rand() % 2 == 0) { |
| cur_val = DeleteRow(key, &pending); |
| } else { |
| cur_val = DeleteIgnoreRow(key, &pending); |
| } |
| } else { |
| // If row already exists, randomly choose between an update, |
| // update ignore, insert ignore, and upsert. |
| switch (rand() % 5) { |
| case 1: |
| cur_val = UpdateRow(key, new_val, cur_val, &pending); |
| break; |
| case 2: |
| cur_val = UpdateIgnoreRow(key, new_val, cur_val, &pending); |
| break; |
| case 3: |
| InsertIgnoreRow(key, new_val, &pending); // won't change current value |
| break; |
| case 4: |
| cur_val = UpsertIgnoreRow(key, new_val, cur_val, &pending); |
| break; |
| default: |
| cur_val = UpsertRow(key, new_val, cur_val, &pending); |
| } |
| } |
| } |
| } |
| |
| VLOG(1) << "Performing batch:"; |
| for (const auto& op : pending) { |
| VLOG(1) << RowOperationsPB::Type_Name(op.type) << " " << op.row->ToString(); |
| } |
| |
| CHECK_OK(writer_->WriteBatch(pending)); |
| for (LocalTabletWriter::RowOp op : pending) { |
| delete op.row; |
| } |
| } |
| |
| void DoRandomBatches() { |
| int op_count = 0; |
| Stopwatch s; |
| s.start(); |
| while (s.elapsed().wall_seconds() < FLAGS_runtime_seconds) { |
| for (int i = 0; i < 100; i++) { |
| NO_FATALS(DoRandomBatch()); |
| op_count++; |
| } |
| } |
| LOG(INFO) << "Ran " << op_count << " ops " |
| << "(" << (op_count / s.elapsed().wall_seconds()) << " ops/sec)"; |
| } |
| |
| // Wakes up periodically to perform a flush or compaction. |
| void BackgroundOpThread() { |
| int n_flushes = 0; |
| while (!done_.WaitFor(MonoDelta::FromMilliseconds(FLAGS_sleep_between_background_ops_ms))) { |
| CHECK_OK(tablet()->Flush()); |
| ++n_flushes; |
| switch (n_flushes % 3) { |
| case 0: |
| CHECK_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL)); |
| break; |
| case 1: |
| CHECK_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION)); |
| break; |
| case 2: |
| CHECK_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION)); |
| break; |
| } |
| } |
| } |
| |
| // Adds an insert for the given key/value pair to 'ops', returning the expected value |
| optional<ExpectedKeyValueRow> InsertRow(int key, int val, vector<LocalTabletWriter::RowOp>* ops) { |
| return DoRowOp(RowOperationsPB::INSERT, key, val, nullopt, ops); |
| } |
| |
| optional<ExpectedKeyValueRow> InsertIgnoreRow(int key, int val, |
| vector<LocalTabletWriter::RowOp>* ops) { |
| return DoRowOp(RowOperationsPB::INSERT_IGNORE, key, val, nullopt, ops); |
| } |
| |
| optional<ExpectedKeyValueRow> UpsertRow(int key, |
| int val, |
| const optional<ExpectedKeyValueRow>& old_row, |
| vector<LocalTabletWriter::RowOp>* ops) { |
| return DoRowOp(RowOperationsPB::UPSERT, key, val, old_row, ops); |
| } |
| |
| optional<ExpectedKeyValueRow> UpsertIgnoreRow(int key, |
| int val, |
| const optional<ExpectedKeyValueRow>& old_row, |
| vector<LocalTabletWriter::RowOp>* ops) { |
| return DoRowOp(RowOperationsPB::UPSERT_IGNORE, key, val, old_row, ops); |
| } |
| |
| // Adds an update of the given key/value pair to 'ops', returning the expected value |
| optional<ExpectedKeyValueRow> UpdateRow(int key, |
| uint32_t new_val, |
| const optional<ExpectedKeyValueRow>& old_row, |
| vector<LocalTabletWriter::RowOp>* ops) { |
| return DoRowOp(RowOperationsPB::UPDATE, key, new_val, old_row, ops); |
| } |
| |
| // Adds an update of the given key/value pair to 'ops', returning the expected value |
| optional<ExpectedKeyValueRow> UpdateIgnoreRow(int key, |
| uint32_t new_val, |
| const optional<ExpectedKeyValueRow>& old_row, |
| vector<LocalTabletWriter::RowOp>* ops) { |
| return DoRowOp(RowOperationsPB::UPDATE_IGNORE, key, new_val, old_row, ops); |
| } |
| |
| optional<ExpectedKeyValueRow> DoRowOp(RowOperationsPB::Type type, |
| int key, |
| int val, |
| const optional<ExpectedKeyValueRow>& old_row, |
| vector<LocalTabletWriter::RowOp>* ops) { |
| |
| unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_)); |
| CHECK_OK(row->SetInt32(0, key)); |
| optional<ExpectedKeyValueRow> ret = ExpectedKeyValueRow(); |
| ret->key = key; |
| |
| switch (type) { |
| case RowOperationsPB::UPSERT: |
| case RowOperationsPB::UPSERT_IGNORE: |
| case RowOperationsPB::UPDATE: |
| case RowOperationsPB::UPDATE_IGNORE: |
| case RowOperationsPB::INSERT: |
| case RowOperationsPB::INSERT_IGNORE: |
| switch (val % 2) { |
| case 0: |
| CHECK_OK(row->SetNull(1)); |
| ret->val.reset(); |
| break; |
| case 1: |
| CHECK_OK(row->SetInt32(1, val)); |
| ret->val = val; |
| break; |
| } |
| |
| if ((type != RowOperationsPB::UPDATE && |
| type != RowOperationsPB::UPDATE_IGNORE) && (val % 3 == 1)) { |
| // Don't set the value. In the case of an INSERT or an UPSERT with no pre-existing |
| // row, this should default to NULL. Otherwise it should remain set to whatever it |
| // was previously set to. |
| CHECK_OK(row->Unset(1)); |
| |
| if (type == RowOperationsPB::INSERT || !old_row) { |
| ret->val.reset(); |
| } else { |
| ret->val = old_row->val; |
| } |
| } |
| break; |
| case RowOperationsPB::DELETE: |
| case RowOperationsPB::DELETE_IGNORE: |
| ret.reset(); |
| break; |
| default: |
| LOG(FATAL) << "Unknown type: " << type; |
| } |
| ops->push_back(LocalTabletWriter::RowOp(type, row.release())); |
| return ret; |
| } |
| |
| // Adds a delete of the given row to 'ops', returning an empty string (indicating that |
| // the row no longer exists). |
| optional<ExpectedKeyValueRow> DeleteRow(int key, vector<LocalTabletWriter::RowOp>* ops) { |
| unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_)); |
| CHECK_OK(row->SetInt32(0, key)); |
| ops->push_back(LocalTabletWriter::RowOp(RowOperationsPB::DELETE, row.release())); |
| return nullopt; |
| } |
| |
| // Adds a delete ignore of the given row to 'ops', returning an empty string (indicating that |
| // the row no longer exists). |
| optional<ExpectedKeyValueRow> DeleteIgnoreRow(int key, vector<LocalTabletWriter::RowOp>* ops) { |
| unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_)); |
| CHECK_OK(row->SetInt32(0, key)); |
| ops->push_back(LocalTabletWriter::RowOp(RowOperationsPB::DELETE_IGNORE, row.release())); |
| return nullopt; |
| } |
| |
| // Random-read the given row, returning its current value. |
| // If the row doesn't exist, returns std::nullopt. |
| optional<ExpectedKeyValueRow> GetRow(int key) { |
| ScanSpec spec; |
| const Schema& schema = this->client_schema_; |
| unique_ptr<RowwiseIterator> iter; |
| CHECK_OK(this->tablet()->NewRowIterator(schema, &iter)); |
| auto pred_one = ColumnPredicate::Equality(schema.column(0), &key); |
| spec.AddPredicate(pred_one); |
| CHECK_OK(iter->Init(&spec)); |
| |
| optional<ExpectedKeyValueRow> ret; |
| int n_results = 0; |
| |
| RowBlockMemory mem(1024); |
| RowBlock block(&schema, 100, &mem); |
| while (iter->HasNext()) { |
| mem.Reset(); |
| CHECK_OK(iter->NextBlock(&block)); |
| for (int i = 0; i < block.nrows(); i++) { |
| if (!block.selection_vector()->IsRowSelected(i)) { |
| continue; |
| } |
| // We expect to only get exactly one result per read. |
| CHECK_EQ(n_results, 0) |
| << "Already got result when looking up row " |
| << key << ": " << *ret |
| << " and now have new matching row: " |
| << schema.DebugRow(block.row(i)) |
| << " iterator: " << iter->ToString(); |
| ret = ExpectedKeyValueRow(); |
| ret->key = *schema.ExtractColumnFromRow<INT32>(block.row(i), 0); |
| if (!block.row(i).is_null(1)) { |
| ret->val = *schema.ExtractColumnFromRow<INT32>(block.row(i), 1); |
| } |
| n_results++; |
| } |
| } |
| return ret; |
| } |
| |
| protected: |
| // The current expected state of the tablet. |
| vector<optional<ExpectedKeyValueRow>> expected_tablet_state_; |
| |
| // Latch triggered when the main thread is finished performing |
| // operations. This stops the compact/flush thread. |
| CountDownLatch done_; |
| |
| unique_ptr<LocalTabletWriter> writer_; |
| }; |
| |
| TEST_F(TestRandomAccess, Test) { |
| thread flush_thread([this]() { this->BackgroundOpThread(); }); |
| SCOPED_CLEANUP({ |
| done_.CountDown(); |
| flush_thread.join(); |
| }); |
| NO_FATALS(DoRandomBatches()); |
| } |
| |
| |
| |
| } // namespace tablet |
| } // namespace kudu |