| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| |
| #ifndef ROCKSDB_LITE |
| |
| #include <functional> |
| #include <string> |
| #include <thread> |
| |
| #include "rocksdb/db.h" |
| #include "rocksdb/utilities/optimistic_transaction_db.h" |
| #include "rocksdb/utilities/transaction.h" |
| #include "util/crc32c.h" |
| #include "util/logging.h" |
| #include "util/random.h" |
| #include "util/testharness.h" |
| #include "util/transaction_test_util.h" |
| #include "port/port.h" |
| |
| using std::string; |
| |
| namespace rocksdb { |
| |
| class OptimisticTransactionTest : public testing::Test { |
| public: |
| OptimisticTransactionDB* txn_db; |
| DB* db; |
| string dbname; |
| Options options; |
| |
| OptimisticTransactionTest() { |
| options.create_if_missing = true; |
| options.max_write_buffer_number = 2; |
| dbname = test::TmpDir() + "/optimistic_transaction_testdb"; |
| |
| DestroyDB(dbname, options); |
| Open(); |
| } |
| ~OptimisticTransactionTest() { |
| delete txn_db; |
| DestroyDB(dbname, options); |
| } |
| |
| void Reopen() { |
| delete txn_db; |
| txn_db = nullptr; |
| Open(); |
| } |
| |
| private: |
| void Open() { |
| Status s = OptimisticTransactionDB::Open(options, dbname, &txn_db); |
| assert(s.ok()); |
| assert(txn_db != nullptr); |
| db = txn_db->GetBaseDB(); |
| } |
| }; |
| |
| TEST_F(OptimisticTransactionTest, SuccessTest) { |
| WriteOptions write_options; |
| ReadOptions read_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, Slice("foo"), Slice("bar")); |
| db->Put(write_options, Slice("foo2"), Slice("bar")); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| txn->GetForUpdate(read_options, "foo", &value); |
| ASSERT_EQ(value, "bar"); |
| |
| txn->Put(Slice("foo"), Slice("bar2")); |
| |
| txn->GetForUpdate(read_options, "foo", &value); |
| ASSERT_EQ(value, "bar2"); |
| |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| |
| db->Get(read_options, "foo", &value); |
| ASSERT_EQ(value, "bar2"); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, WriteConflictTest) { |
| WriteOptions write_options; |
| ReadOptions read_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, "foo", "bar"); |
| db->Put(write_options, "foo2", "bar"); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| txn->Put("foo", "bar2"); |
| |
| // This Put outside of a transaction will conflict with the previous write |
| s = db->Put(write_options, "foo", "barz"); |
| ASSERT_OK(s); |
| |
| s = db->Get(read_options, "foo", &value); |
| ASSERT_EQ(value, "barz"); |
| ASSERT_EQ(1, txn->GetNumKeys()); |
| |
| s = txn->Commit(); |
| ASSERT_TRUE(s.IsBusy()); // Txn should not commit |
| |
| // Verify that transaction did not write anything |
| db->Get(read_options, "foo", &value); |
| ASSERT_EQ(value, "barz"); |
| db->Get(read_options, "foo2", &value); |
| ASSERT_EQ(value, "bar"); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, WriteConflictTest2) { |
| WriteOptions write_options; |
| ReadOptions read_options; |
| OptimisticTransactionOptions txn_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, "foo", "bar"); |
| db->Put(write_options, "foo2", "bar"); |
| |
| txn_options.set_snapshot = true; |
| Transaction* txn = txn_db->BeginTransaction(write_options, txn_options); |
| ASSERT_TRUE(txn); |
| |
| // This Put outside of a transaction will conflict with a later write |
| s = db->Put(write_options, "foo", "barz"); |
| ASSERT_OK(s); |
| |
| txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken |
| |
| s = db->Get(read_options, "foo", &value); |
| ASSERT_EQ(value, "barz"); |
| |
| s = txn->Commit(); |
| ASSERT_TRUE(s.IsBusy()); // Txn should not commit |
| |
| // Verify that transaction did not write anything |
| db->Get(read_options, "foo", &value); |
| ASSERT_EQ(value, "barz"); |
| db->Get(read_options, "foo2", &value); |
| ASSERT_EQ(value, "bar"); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, ReadConflictTest) { |
| WriteOptions write_options; |
| ReadOptions read_options, snapshot_read_options; |
| OptimisticTransactionOptions txn_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, "foo", "bar"); |
| db->Put(write_options, "foo2", "bar"); |
| |
| txn_options.set_snapshot = true; |
| Transaction* txn = txn_db->BeginTransaction(write_options, txn_options); |
| ASSERT_TRUE(txn); |
| |
| txn->SetSnapshot(); |
| snapshot_read_options.snapshot = txn->GetSnapshot(); |
| |
| txn->GetForUpdate(snapshot_read_options, "foo", &value); |
| ASSERT_EQ(value, "bar"); |
| |
| // This Put outside of a transaction will conflict with the previous read |
| s = db->Put(write_options, "foo", "barz"); |
| ASSERT_OK(s); |
| |
| s = db->Get(read_options, "foo", &value); |
| ASSERT_EQ(value, "barz"); |
| |
| s = txn->Commit(); |
| ASSERT_TRUE(s.IsBusy()); // Txn should not commit |
| |
| // Verify that transaction did not write anything |
| txn->GetForUpdate(read_options, "foo", &value); |
| ASSERT_EQ(value, "barz"); |
| txn->GetForUpdate(read_options, "foo2", &value); |
| ASSERT_EQ(value, "bar"); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, TxnOnlyTest) { |
| // Test to make sure transactions work when there are no other writes in an |
| // empty db. |
| |
| WriteOptions write_options; |
| ReadOptions read_options; |
| string value; |
| Status s; |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| txn->Put("x", "y"); |
| |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, FlushTest) { |
| WriteOptions write_options; |
| ReadOptions read_options, snapshot_read_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, Slice("foo"), Slice("bar")); |
| db->Put(write_options, Slice("foo2"), Slice("bar")); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| snapshot_read_options.snapshot = txn->GetSnapshot(); |
| |
| txn->GetForUpdate(snapshot_read_options, "foo", &value); |
| ASSERT_EQ(value, "bar"); |
| |
| txn->Put(Slice("foo"), Slice("bar2")); |
| |
| txn->GetForUpdate(snapshot_read_options, "foo", &value); |
| ASSERT_EQ(value, "bar2"); |
| |
| // Put a random key so we have a memtable to flush |
| s = db->Put(write_options, "dummy", "dummy"); |
| ASSERT_OK(s); |
| |
| // force a memtable flush |
| FlushOptions flush_ops; |
| db->Flush(flush_ops); |
| |
| s = txn->Commit(); |
| // txn should commit since the flushed table is still in MemtableList History |
| ASSERT_OK(s); |
| |
| db->Get(read_options, "foo", &value); |
| ASSERT_EQ(value, "bar2"); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, FlushTest2) { |
| WriteOptions write_options; |
| ReadOptions read_options, snapshot_read_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, Slice("foo"), Slice("bar")); |
| db->Put(write_options, Slice("foo2"), Slice("bar")); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| snapshot_read_options.snapshot = txn->GetSnapshot(); |
| |
| txn->GetForUpdate(snapshot_read_options, "foo", &value); |
| ASSERT_EQ(value, "bar"); |
| |
| txn->Put(Slice("foo"), Slice("bar2")); |
| |
| txn->GetForUpdate(snapshot_read_options, "foo", &value); |
| ASSERT_EQ(value, "bar2"); |
| |
| // Put a random key so we have a MemTable to flush |
| s = db->Put(write_options, "dummy", "dummy"); |
| ASSERT_OK(s); |
| |
| // force a memtable flush |
| FlushOptions flush_ops; |
| db->Flush(flush_ops); |
| |
| // Put a random key so we have a MemTable to flush |
| s = db->Put(write_options, "dummy", "dummy2"); |
| ASSERT_OK(s); |
| |
| // force a memtable flush |
| db->Flush(flush_ops); |
| |
| s = db->Put(write_options, "dummy", "dummy3"); |
| ASSERT_OK(s); |
| |
| // force a memtable flush |
| // Since our test db has max_write_buffer_number=2, this flush will cause |
| // the first memtable to get purged from the MemtableList history. |
| db->Flush(flush_ops); |
| |
| s = txn->Commit(); |
| // txn should not commit since MemTableList History is not large enough |
| ASSERT_TRUE(s.IsTryAgain()); |
| |
| db->Get(read_options, "foo", &value); |
| ASSERT_EQ(value, "bar"); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, NoSnapshotTest) { |
| WriteOptions write_options; |
| ReadOptions read_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, "AAA", "bar"); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| // Modify key after transaction start |
| db->Put(write_options, "AAA", "bar1"); |
| |
| // Read and write without a snapshot |
| txn->GetForUpdate(read_options, "AAA", &value); |
| ASSERT_EQ(value, "bar1"); |
| txn->Put("AAA", "bar2"); |
| |
| // Should commit since read/write was done after data changed |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| |
| txn->GetForUpdate(read_options, "AAA", &value); |
| ASSERT_EQ(value, "bar2"); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) { |
| WriteOptions write_options; |
| ReadOptions read_options, snapshot_read_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, "AAA", "bar"); |
| db->Put(write_options, "BBB", "bar"); |
| db->Put(write_options, "CCC", "bar"); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| db->Put(write_options, "AAA", "bar1"); |
| |
| // Read and write without a snapshot |
| txn->GetForUpdate(read_options, "AAA", &value); |
| ASSERT_EQ(value, "bar1"); |
| txn->Put("AAA", "bar2"); |
| |
| // Modify BBB before snapshot is taken |
| db->Put(write_options, "BBB", "bar1"); |
| |
| txn->SetSnapshot(); |
| snapshot_read_options.snapshot = txn->GetSnapshot(); |
| |
| // Read and write with snapshot |
| txn->GetForUpdate(snapshot_read_options, "BBB", &value); |
| ASSERT_EQ(value, "bar1"); |
| txn->Put("BBB", "bar2"); |
| |
| db->Put(write_options, "CCC", "bar1"); |
| |
| // Set a new snapshot |
| txn->SetSnapshot(); |
| snapshot_read_options.snapshot = txn->GetSnapshot(); |
| |
| // Read and write with snapshot |
| txn->GetForUpdate(snapshot_read_options, "CCC", &value); |
| ASSERT_EQ(value, "bar1"); |
| txn->Put("CCC", "bar2"); |
| |
| s = txn->GetForUpdate(read_options, "AAA", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar2"); |
| s = txn->GetForUpdate(read_options, "BBB", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar2"); |
| s = txn->GetForUpdate(read_options, "CCC", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar2"); |
| |
| s = db->Get(read_options, "AAA", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar1"); |
| s = db->Get(read_options, "BBB", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar1"); |
| s = db->Get(read_options, "CCC", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar1"); |
| |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| |
| s = db->Get(read_options, "AAA", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar2"); |
| s = db->Get(read_options, "BBB", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar2"); |
| s = db->Get(read_options, "CCC", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "bar2"); |
| |
| // verify that we track multiple writes to the same key at different snapshots |
| delete txn; |
| txn = txn_db->BeginTransaction(write_options); |
| |
| // Potentially conflicting writes |
| db->Put(write_options, "ZZZ", "zzz"); |
| db->Put(write_options, "XXX", "xxx"); |
| |
| txn->SetSnapshot(); |
| |
| OptimisticTransactionOptions txn_options; |
| txn_options.set_snapshot = true; |
| Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options); |
| txn2->SetSnapshot(); |
| |
| // This should not conflict in txn since the snapshot is later than the |
| // previous write (spoiler alert: it will later conflict with txn2). |
| txn->Put("ZZZ", "zzzz"); |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| |
| delete txn; |
| |
| // This will conflict since the snapshot is earlier than another write to ZZZ |
| txn2->Put("ZZZ", "xxxxx"); |
| |
| s = txn2->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| delete txn2; |
| } |
| |
| TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { |
| WriteOptions write_options; |
| ReadOptions read_options, snapshot_read_options; |
| OptimisticTransactionOptions txn_options; |
| string value; |
| Status s; |
| |
| ColumnFamilyHandle *cfa, *cfb; |
| ColumnFamilyOptions cf_options; |
| |
| // Create 2 new column families |
| s = db->CreateColumnFamily(cf_options, "CFA", &cfa); |
| ASSERT_OK(s); |
| s = db->CreateColumnFamily(cf_options, "CFB", &cfb); |
| ASSERT_OK(s); |
| |
| delete cfa; |
| delete cfb; |
| delete txn_db; |
| txn_db = nullptr; |
| |
| // open DB with three column families |
| std::vector<ColumnFamilyDescriptor> column_families; |
| // have to open default column family |
| column_families.push_back( |
| ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); |
| // open the new column families |
| column_families.push_back( |
| ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); |
| column_families.push_back( |
| ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); |
| std::vector<ColumnFamilyHandle*> handles; |
| s = OptimisticTransactionDB::Open(options, dbname, column_families, &handles, |
| &txn_db); |
| ASSERT_OK(s); |
| assert(txn_db != nullptr); |
| db = txn_db->GetBaseDB(); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| txn->SetSnapshot(); |
| snapshot_read_options.snapshot = txn->GetSnapshot(); |
| |
| txn_options.set_snapshot = true; |
| Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options); |
| ASSERT_TRUE(txn2); |
| |
| // Write some data to the db |
| WriteBatch batch; |
| batch.Put("foo", "foo"); |
| batch.Put(handles[1], "AAA", "bar"); |
| batch.Put(handles[1], "AAAZZZ", "bar"); |
| s = db->Write(write_options, &batch); |
| ASSERT_OK(s); |
| db->Delete(write_options, handles[1], "AAAZZZ"); |
| |
| // These keys do no conflict with existing writes since they're in |
| // different column families |
| txn->Delete("AAA"); |
| txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value); |
| Slice key_slice("AAAZZZ"); |
| Slice value_slices[2] = {Slice("bar"), Slice("bar")}; |
| txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2)); |
| |
| ASSERT_EQ(3, txn->GetNumKeys()); |
| |
| // Txn should commit |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| s = db->Get(read_options, "AAA", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| s = db->Get(read_options, handles[2], "AAAZZZ", &value); |
| ASSERT_EQ(value, "barbar"); |
| |
| Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")}; |
| Slice value_slice("barbarbar"); |
| // This write will cause a conflict with the earlier batch write |
| txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1)); |
| |
| txn2->Delete(handles[2], "XXX"); |
| txn2->Delete(handles[1], "XXX"); |
| s = txn2->GetForUpdate(snapshot_read_options, handles[1], "AAA", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| // Verify txn did not commit |
| s = txn2->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| s = db->Get(read_options, handles[1], "AAAZZZ", &value); |
| ASSERT_EQ(value, "barbar"); |
| |
| delete txn; |
| delete txn2; |
| |
| txn = txn_db->BeginTransaction(write_options, txn_options); |
| snapshot_read_options.snapshot = txn->GetSnapshot(); |
| |
| txn2 = txn_db->BeginTransaction(write_options, txn_options); |
| ASSERT_TRUE(txn); |
| |
| std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2], |
| handles[0], handles[2]}; |
| std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"}; |
| std::vector<std::string> values(4); |
| |
| std::vector<Status> results = txn->MultiGetForUpdate( |
| snapshot_read_options, multiget_cfh, multiget_keys, &values); |
| ASSERT_OK(results[0]); |
| ASSERT_OK(results[1]); |
| ASSERT_OK(results[2]); |
| ASSERT_TRUE(results[3].IsNotFound()); |
| ASSERT_EQ(values[0], "bar"); |
| ASSERT_EQ(values[1], "barbar"); |
| ASSERT_EQ(values[2], "foo"); |
| |
| txn->Delete(handles[2], "ZZZ"); |
| txn->Put(handles[2], "ZZZ", "YYY"); |
| txn->Put(handles[2], "ZZZ", "YYYY"); |
| txn->Delete(handles[2], "ZZZ"); |
| txn->Put(handles[2], "AAAZZZ", "barbarbar"); |
| |
| ASSERT_EQ(5, txn->GetNumKeys()); |
| |
| // Txn should commit |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| s = db->Get(read_options, handles[2], "ZZZ", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| // Put a key which will conflict with the next txn using the previous snapshot |
| db->Put(write_options, handles[2], "foo", "000"); |
| |
| results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh, |
| multiget_keys, &values); |
| ASSERT_OK(results[0]); |
| ASSERT_OK(results[1]); |
| ASSERT_OK(results[2]); |
| ASSERT_TRUE(results[3].IsNotFound()); |
| ASSERT_EQ(values[0], "bar"); |
| ASSERT_EQ(values[1], "barbar"); |
| ASSERT_EQ(values[2], "foo"); |
| |
| // Verify Txn Did not Commit |
| s = txn2->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| s = db->DropColumnFamily(handles[1]); |
| ASSERT_OK(s); |
| s = db->DropColumnFamily(handles[2]); |
| ASSERT_OK(s); |
| |
| delete txn; |
| delete txn2; |
| |
| for (auto handle : handles) { |
| delete handle; |
| } |
| } |
| |
| TEST_F(OptimisticTransactionTest, EmptyTest) { |
| WriteOptions write_options; |
| ReadOptions read_options; |
| string value; |
| Status s; |
| |
| s = db->Put(write_options, "aaa", "aaa"); |
| ASSERT_OK(s); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| delete txn; |
| |
| txn = txn_db->BeginTransaction(write_options); |
| txn->Rollback(); |
| delete txn; |
| |
| txn = txn_db->BeginTransaction(write_options); |
| s = txn->GetForUpdate(read_options, "aaa", &value); |
| ASSERT_EQ(value, "aaa"); |
| |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| delete txn; |
| |
| txn = txn_db->BeginTransaction(write_options); |
| txn->SetSnapshot(); |
| s = txn->GetForUpdate(read_options, "aaa", &value); |
| ASSERT_EQ(value, "aaa"); |
| |
| s = db->Put(write_options, "aaa", "xxx"); |
| s = txn->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, PredicateManyPreceders) { |
| WriteOptions write_options; |
| ReadOptions read_options1, read_options2; |
| OptimisticTransactionOptions txn_options; |
| string value; |
| Status s; |
| |
| txn_options.set_snapshot = true; |
| Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options1.snapshot = txn1->GetSnapshot(); |
| |
| Transaction* txn2 = txn_db->BeginTransaction(write_options); |
| txn2->SetSnapshot(); |
| read_options2.snapshot = txn2->GetSnapshot(); |
| |
| std::vector<Slice> multiget_keys = {"1", "2", "3"}; |
| std::vector<std::string> multiget_values; |
| |
| std::vector<Status> results = |
| txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); |
| ASSERT_TRUE(results[1].IsNotFound()); |
| |
| txn2->Put("2", "x"); |
| |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| |
| multiget_values.clear(); |
| results = |
| txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); |
| ASSERT_TRUE(results[1].IsNotFound()); |
| |
| // should not commit since txn2 wrote a key txn has read |
| s = txn1->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| delete txn1; |
| delete txn2; |
| |
| txn1 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options1.snapshot = txn1->GetSnapshot(); |
| |
| txn2 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options2.snapshot = txn2->GetSnapshot(); |
| |
| txn1->Put("4", "x"); |
| |
| txn2->Delete("4"); |
| |
| // txn1 can commit since txn2's delete hasn't happened yet (it's just batched) |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| |
| s = txn2->GetForUpdate(read_options2, "4", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| // txn2 cannot commit since txn1 changed "4" |
| s = txn2->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| delete txn1; |
| delete txn2; |
| } |
| |
| TEST_F(OptimisticTransactionTest, LostUpdate) { |
| WriteOptions write_options; |
| ReadOptions read_options, read_options1, read_options2; |
| OptimisticTransactionOptions txn_options; |
| string value; |
| Status s; |
| |
| // Test 2 transactions writing to the same key in multiple orders and |
| // with/without snapshots |
| |
| Transaction* txn1 = txn_db->BeginTransaction(write_options); |
| Transaction* txn2 = txn_db->BeginTransaction(write_options); |
| |
| txn1->Put("1", "1"); |
| txn2->Put("1", "2"); |
| |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| |
| s = txn2->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| delete txn1; |
| delete txn2; |
| |
| txn_options.set_snapshot = true; |
| txn1 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options1.snapshot = txn1->GetSnapshot(); |
| |
| txn2 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options2.snapshot = txn2->GetSnapshot(); |
| |
| txn1->Put("1", "3"); |
| txn2->Put("1", "4"); |
| |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| |
| s = txn2->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| delete txn1; |
| delete txn2; |
| |
| txn1 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options1.snapshot = txn1->GetSnapshot(); |
| |
| txn2 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options2.snapshot = txn2->GetSnapshot(); |
| |
| txn1->Put("1", "5"); |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| |
| txn2->Put("1", "6"); |
| s = txn2->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| delete txn1; |
| delete txn2; |
| |
| txn1 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options1.snapshot = txn1->GetSnapshot(); |
| |
| txn2 = txn_db->BeginTransaction(write_options, txn_options); |
| read_options2.snapshot = txn2->GetSnapshot(); |
| |
| txn1->Put("1", "5"); |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| |
| txn2->SetSnapshot(); |
| txn2->Put("1", "6"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| |
| delete txn1; |
| delete txn2; |
| |
| txn1 = txn_db->BeginTransaction(write_options); |
| txn2 = txn_db->BeginTransaction(write_options); |
| |
| txn1->Put("1", "7"); |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| |
| txn2->Put("1", "8"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| |
| delete txn1; |
| delete txn2; |
| |
| s = db->Get(read_options, "1", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ(value, "8"); |
| } |
| |
| TEST_F(OptimisticTransactionTest, UntrackedWrites) { |
| WriteOptions write_options; |
| ReadOptions read_options; |
| string value; |
| Status s; |
| |
| // Verify transaction rollback works for untracked keys. |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| txn->PutUntracked("untracked", "0"); |
| txn->Rollback(); |
| s = db->Get(read_options, "untracked", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| delete txn; |
| txn = txn_db->BeginTransaction(write_options); |
| |
| txn->Put("tracked", "1"); |
| txn->PutUntracked("untracked", "1"); |
| txn->MergeUntracked("untracked", "2"); |
| txn->DeleteUntracked("untracked"); |
| |
| // Write to the untracked key outside of the transaction and verify |
| // it doesn't prevent the transaction from committing. |
| s = db->Put(write_options, "untracked", "x"); |
| ASSERT_OK(s); |
| |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| |
| s = db->Get(read_options, "untracked", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| delete txn; |
| txn = txn_db->BeginTransaction(write_options); |
| |
| txn->Put("tracked", "10"); |
| txn->PutUntracked("untracked", "A"); |
| |
| // Write to tracked key outside of the transaction and verify that the |
| // untracked keys are not written when the commit fails. |
| s = db->Delete(write_options, "tracked"); |
| |
| s = txn->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| s = db->Get(read_options, "untracked", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, IteratorTest) { |
| WriteOptions write_options; |
| ReadOptions read_options, snapshot_read_options; |
| OptimisticTransactionOptions txn_options; |
| string value; |
| Status s; |
| |
| // Write some keys to the db |
| s = db->Put(write_options, "A", "a"); |
| ASSERT_OK(s); |
| |
| s = db->Put(write_options, "G", "g"); |
| ASSERT_OK(s); |
| |
| s = db->Put(write_options, "F", "f"); |
| ASSERT_OK(s); |
| |
| s = db->Put(write_options, "C", "c"); |
| ASSERT_OK(s); |
| |
| s = db->Put(write_options, "D", "d"); |
| ASSERT_OK(s); |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| // Write some keys in a txn |
| s = txn->Put("B", "b"); |
| ASSERT_OK(s); |
| |
| s = txn->Put("H", "h"); |
| ASSERT_OK(s); |
| |
| s = txn->Delete("D"); |
| ASSERT_OK(s); |
| |
| s = txn->Put("E", "e"); |
| ASSERT_OK(s); |
| |
| txn->SetSnapshot(); |
| const Snapshot* snapshot = txn->GetSnapshot(); |
| |
| // Write some keys to the db after the snapshot |
| s = db->Put(write_options, "BB", "xx"); |
| ASSERT_OK(s); |
| |
| s = db->Put(write_options, "C", "xx"); |
| ASSERT_OK(s); |
| |
| read_options.snapshot = snapshot; |
| Iterator* iter = txn->GetIterator(read_options); |
| ASSERT_OK(iter->status()); |
| iter->SeekToFirst(); |
| |
| // Read all keys via iter and lock them all |
| std::string results[] = {"a", "b", "c", "e", "f", "g", "h"}; |
| for (int i = 0; i < 7; i++) { |
| ASSERT_OK(iter->status()); |
| ASSERT_TRUE(iter->Valid()); |
| ASSERT_EQ(results[i], iter->value().ToString()); |
| |
| s = txn->GetForUpdate(read_options, iter->key(), nullptr); |
| ASSERT_OK(s); |
| |
| iter->Next(); |
| } |
| ASSERT_FALSE(iter->Valid()); |
| |
| iter->Seek("G"); |
| ASSERT_OK(iter->status()); |
| ASSERT_TRUE(iter->Valid()); |
| ASSERT_EQ("g", iter->value().ToString()); |
| |
| iter->Prev(); |
| ASSERT_OK(iter->status()); |
| ASSERT_TRUE(iter->Valid()); |
| ASSERT_EQ("f", iter->value().ToString()); |
| |
| iter->Seek("D"); |
| ASSERT_OK(iter->status()); |
| ASSERT_TRUE(iter->Valid()); |
| ASSERT_EQ("e", iter->value().ToString()); |
| |
| iter->Seek("C"); |
| ASSERT_OK(iter->status()); |
| ASSERT_TRUE(iter->Valid()); |
| ASSERT_EQ("c", iter->value().ToString()); |
| |
| iter->Next(); |
| ASSERT_OK(iter->status()); |
| ASSERT_TRUE(iter->Valid()); |
| ASSERT_EQ("e", iter->value().ToString()); |
| |
| iter->Seek(""); |
| ASSERT_OK(iter->status()); |
| ASSERT_TRUE(iter->Valid()); |
| ASSERT_EQ("a", iter->value().ToString()); |
| |
| iter->Seek("X"); |
| ASSERT_OK(iter->status()); |
| ASSERT_FALSE(iter->Valid()); |
| |
| iter->SeekToLast(); |
| ASSERT_OK(iter->status()); |
| ASSERT_TRUE(iter->Valid()); |
| ASSERT_EQ("h", iter->value().ToString()); |
| |
| // key "C" was modified in the db after txn's snapshot. txn will not commit. |
| s = txn->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| |
| delete iter; |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, SavepointTest) { |
| WriteOptions write_options; |
| ReadOptions read_options, snapshot_read_options; |
| OptimisticTransactionOptions txn_options; |
| string value; |
| Status s; |
| |
| Transaction* txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| s = txn->RollbackToSavePoint(); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| txn->SetSavePoint(); // 1 |
| |
| ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn |
| s = txn->RollbackToSavePoint(); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = txn->Put("B", "b"); |
| ASSERT_OK(s); |
| |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| |
| s = db->Get(read_options, "B", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("b", value); |
| |
| delete txn; |
| txn = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn); |
| |
| s = txn->Put("A", "a"); |
| ASSERT_OK(s); |
| |
| s = txn->Put("B", "bb"); |
| ASSERT_OK(s); |
| |
| s = txn->Put("C", "c"); |
| ASSERT_OK(s); |
| |
| txn->SetSavePoint(); // 2 |
| |
| s = txn->Delete("B"); |
| ASSERT_OK(s); |
| |
| s = txn->Put("C", "cc"); |
| ASSERT_OK(s); |
| |
| s = txn->Put("D", "d"); |
| ASSERT_OK(s); |
| |
| ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2 |
| |
| s = txn->Get(read_options, "A", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("a", value); |
| |
| s = txn->Get(read_options, "B", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("bb", value); |
| |
| s = txn->Get(read_options, "C", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("c", value); |
| |
| s = txn->Get(read_options, "D", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = txn->Put("A", "a"); |
| ASSERT_OK(s); |
| |
| s = txn->Put("E", "e"); |
| ASSERT_OK(s); |
| |
| // Rollback to beginning of txn |
| s = txn->RollbackToSavePoint(); |
| ASSERT_TRUE(s.IsNotFound()); |
| txn->Rollback(); |
| |
| s = txn->Get(read_options, "A", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = txn->Get(read_options, "B", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("b", value); |
| |
| s = txn->Get(read_options, "D", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = txn->Get(read_options, "D", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = txn->Get(read_options, "E", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = txn->Put("A", "aa"); |
| ASSERT_OK(s); |
| |
| s = txn->Put("F", "f"); |
| ASSERT_OK(s); |
| |
| txn->SetSavePoint(); // 3 |
| txn->SetSavePoint(); // 4 |
| |
| s = txn->Put("G", "g"); |
| ASSERT_OK(s); |
| |
| s = txn->Delete("F"); |
| ASSERT_OK(s); |
| |
| s = txn->Delete("B"); |
| ASSERT_OK(s); |
| |
| s = txn->Get(read_options, "A", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("aa", value); |
| |
| s = txn->Get(read_options, "F", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = txn->Get(read_options, "B", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3 |
| |
| s = txn->Get(read_options, "F", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("f", value); |
| |
| s = txn->Get(read_options, "G", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = txn->Commit(); |
| ASSERT_OK(s); |
| |
| s = db->Get(read_options, "F", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("f", value); |
| |
| s = db->Get(read_options, "G", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = db->Get(read_options, "A", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("aa", value); |
| |
| s = db->Get(read_options, "B", &value); |
| ASSERT_OK(s); |
| ASSERT_EQ("b", value); |
| |
| s = db->Get(read_options, "C", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = db->Get(read_options, "D", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| s = db->Get(read_options, "E", &value); |
| ASSERT_TRUE(s.IsNotFound()); |
| |
| delete txn; |
| } |
| |
| TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) { |
| WriteOptions write_options; |
| ReadOptions read_options, snapshot_read_options; |
| OptimisticTransactionOptions txn_options; |
| string value; |
| Status s; |
| |
| db->Put(write_options, "A", ""); |
| |
| Transaction* txn1 = txn_db->BeginTransaction(write_options); |
| ASSERT_TRUE(txn1); |
| |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| |
| txn1->UndoGetForUpdate("A"); |
| |
| Transaction* txn2 = txn_db->BeginTransaction(write_options); |
| txn2->Put("A", "x"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| delete txn2; |
| |
| // Verify that txn1 can commit since A isn't conflict checked |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| delete txn1; |
| |
| txn1 = txn_db->BeginTransaction(write_options); |
| txn1->Put("A", "a"); |
| |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| |
| txn1->UndoGetForUpdate("A"); |
| |
| txn2 = txn_db->BeginTransaction(write_options); |
| txn2->Put("A", "x"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| delete txn2; |
| |
| // Verify that txn1 cannot commit since A will still be conflict checked |
| s = txn1->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| delete txn1; |
| |
| txn1 = txn_db->BeginTransaction(write_options); |
| |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| |
| txn1->UndoGetForUpdate("A"); |
| |
| txn2 = txn_db->BeginTransaction(write_options); |
| txn2->Put("A", "x"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| delete txn2; |
| |
| // Verify that txn1 cannot commit since A will still be conflict checked |
| s = txn1->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| delete txn1; |
| |
| txn1 = txn_db->BeginTransaction(write_options); |
| |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| |
| txn1->UndoGetForUpdate("A"); |
| txn1->UndoGetForUpdate("A"); |
| |
| txn2 = txn_db->BeginTransaction(write_options); |
| txn2->Put("A", "x"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| delete txn2; |
| |
| // Verify that txn1 can commit since A isn't conflict checked |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| delete txn1; |
| |
| txn1 = txn_db->BeginTransaction(write_options); |
| |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| |
| txn1->SetSavePoint(); |
| txn1->UndoGetForUpdate("A"); |
| |
| txn2 = txn_db->BeginTransaction(write_options); |
| txn2->Put("A", "x"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| delete txn2; |
| |
| // Verify that txn1 cannot commit since A will still be conflict checked |
| s = txn1->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| delete txn1; |
| |
| txn1 = txn_db->BeginTransaction(write_options); |
| |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| |
| txn1->SetSavePoint(); |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| txn1->UndoGetForUpdate("A"); |
| |
| txn2 = txn_db->BeginTransaction(write_options); |
| txn2->Put("A", "x"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| delete txn2; |
| |
| // Verify that txn1 cannot commit since A will still be conflict checked |
| s = txn1->Commit(); |
| ASSERT_TRUE(s.IsBusy()); |
| delete txn1; |
| |
| txn1 = txn_db->BeginTransaction(write_options); |
| |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| |
| txn1->SetSavePoint(); |
| s = txn1->GetForUpdate(read_options, "A", &value); |
| ASSERT_OK(s); |
| txn1->UndoGetForUpdate("A"); |
| |
| txn1->RollbackToSavePoint(); |
| txn1->UndoGetForUpdate("A"); |
| |
| txn2 = txn_db->BeginTransaction(write_options); |
| txn2->Put("A", "x"); |
| s = txn2->Commit(); |
| ASSERT_OK(s); |
| delete txn2; |
| |
| // Verify that txn1 can commit since A isn't conflict checked |
| s = txn1->Commit(); |
| ASSERT_OK(s); |
| delete txn1; |
| } |
| |
| namespace { |
| Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db, |
| const size_t num_transactions, |
| const size_t num_sets, |
| const size_t num_keys_per_set) { |
| size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id()); |
| Random64 _rand(seed); |
| WriteOptions write_options; |
| ReadOptions read_options; |
| OptimisticTransactionOptions txn_options; |
| txn_options.set_snapshot = true; |
| |
| RandomTransactionInserter inserter(&_rand, write_options, read_options, |
| num_keys_per_set, |
| static_cast<uint16_t>(num_sets)); |
| |
| for (size_t t = 0; t < num_transactions; t++) { |
| bool success = inserter.OptimisticTransactionDBInsert(db, txn_options); |
| if (!success) { |
| // unexpected failure |
| return inserter.GetLastStatus(); |
| } |
| } |
| |
| // Make sure at least some of the transactions succeeded. It's ok if |
| // some failed due to write-conflicts. |
| if (inserter.GetFailureCount() > num_transactions / 2) { |
| return Status::TryAgain("Too many transactions failed! " + |
| std::to_string(inserter.GetFailureCount()) + " / " + |
| std::to_string(num_transactions)); |
| } |
| |
| return Status::OK(); |
| } |
| } // namespace |
| |
| TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) { |
| const size_t num_threads = 4; |
| const size_t num_transactions_per_thread = 10000; |
| const size_t num_sets = 3; |
| const size_t num_keys_per_set = 100; |
| // Setting the key-space to be 100 keys should cause enough write-conflicts |
| // to make this test interesting. |
| |
| std::vector<port::Thread> threads; |
| |
| std::function<void()> call_inserter = [&] { |
| ASSERT_OK(OptimisticTransactionStressTestInserter( |
| txn_db, num_transactions_per_thread, num_sets, num_keys_per_set)); |
| }; |
| |
| // Create N threads that use RandomTransactionInserter to write |
| // many transactions. |
| for (uint32_t i = 0; i < num_threads; i++) { |
| threads.emplace_back(call_inserter); |
| } |
| |
| // Wait for all threads to run |
| for (auto& t : threads) { |
| t.join(); |
| } |
| |
| // Verify that data is consistent |
| Status s = RandomTransactionInserter::Verify(db, num_sets); |
| ASSERT_OK(s); |
| } |
| |
| TEST_F(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) { |
| WriteOptions write_options; |
| OptimisticTransactionOptions transaction_options; |
| |
| Transaction* transaction(txn_db->BeginTransaction(write_options, transaction_options)); |
| Status s = transaction->Put("foo", "val"); |
| ASSERT_OK(s); |
| s = transaction->Put("foo2", "val"); |
| ASSERT_OK(s); |
| s = transaction->Put("foo3", "val"); |
| ASSERT_OK(s); |
| s = transaction->Commit(); |
| ASSERT_OK(s); |
| delete transaction; |
| |
| Reopen(); |
| transaction = txn_db->BeginTransaction(write_options, transaction_options); |
| s = transaction->Put("bar", "val"); |
| ASSERT_OK(s); |
| s = transaction->Put("bar2", "val"); |
| ASSERT_OK(s); |
| s = transaction->Commit(); |
| ASSERT_OK(s); |
| |
| delete transaction; |
| } |
| |
| } // namespace rocksdb |
| |
| int main(int argc, char** argv) { |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |
| |
| #else |
| #include <stdio.h> |
| |
| int main(int argc, char** argv) { |
| fprintf( |
| stderr, |
| "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n"); |
| return 0; |
| } |
| |
| #endif // !ROCKSDB_LITE |