| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "memtable/inlineskiplist.h" |
| #include <set> |
| #include <unordered_set> |
| #include "rocksdb/env.h" |
| #include "util/concurrent_arena.h" |
| #include "util/hash.h" |
| #include "util/random.h" |
| #include "util/testharness.h" |
| |
| namespace rocksdb { |
| |
| // Our test skip list stores 8-byte unsigned integers |
| typedef uint64_t Key; |
| |
| static const char* Encode(const uint64_t* key) { |
| return reinterpret_cast<const char*>(key); |
| } |
| |
| static Key Decode(const char* key) { |
| Key rv; |
| memcpy(&rv, key, sizeof(Key)); |
| return rv; |
| } |
| |
| struct TestComparator { |
| int operator()(const char* a, const char* b) const { |
| if (Decode(a) < Decode(b)) { |
| return -1; |
| } else if (Decode(a) > Decode(b)) { |
| return +1; |
| } else { |
| return 0; |
| } |
| } |
| }; |
| |
| typedef InlineSkipList<TestComparator> TestInlineSkipList; |
| |
| class InlineSkipTest : public testing::Test { |
| public: |
| void Insert(TestInlineSkipList* list, Key key) { |
| char* buf = list->AllocateKey(sizeof(Key)); |
| memcpy(buf, &key, sizeof(Key)); |
| list->Insert(buf); |
| keys_.insert(key); |
| } |
| |
| void InsertWithHint(TestInlineSkipList* list, Key key, void** hint) { |
| char* buf = list->AllocateKey(sizeof(Key)); |
| memcpy(buf, &key, sizeof(Key)); |
| list->InsertWithHint(buf, hint); |
| keys_.insert(key); |
| } |
| |
| void Validate(TestInlineSkipList* list) { |
| // Check keys exist. |
| for (Key key : keys_) { |
| ASSERT_TRUE(list->Contains(Encode(&key))); |
| } |
| // Iterate over the list, make sure keys appears in order and no extra |
| // keys exist. |
| TestInlineSkipList::Iterator iter(list); |
| ASSERT_FALSE(iter.Valid()); |
| Key zero = 0; |
| iter.Seek(Encode(&zero)); |
| for (Key key : keys_) { |
| ASSERT_TRUE(iter.Valid()); |
| ASSERT_EQ(key, Decode(iter.key())); |
| iter.Next(); |
| } |
| ASSERT_FALSE(iter.Valid()); |
| // Validate the list is well-formed. |
| list->TEST_Validate(); |
| } |
| |
| private: |
| std::set<Key> keys_; |
| }; |
| |
| TEST_F(InlineSkipTest, Empty) { |
| Arena arena; |
| TestComparator cmp; |
| InlineSkipList<TestComparator> list(cmp, &arena); |
| Key key = 10; |
| ASSERT_TRUE(!list.Contains(Encode(&key))); |
| |
| InlineSkipList<TestComparator>::Iterator iter(&list); |
| ASSERT_TRUE(!iter.Valid()); |
| iter.SeekToFirst(); |
| ASSERT_TRUE(!iter.Valid()); |
| key = 100; |
| iter.Seek(Encode(&key)); |
| ASSERT_TRUE(!iter.Valid()); |
| iter.SeekForPrev(Encode(&key)); |
| ASSERT_TRUE(!iter.Valid()); |
| iter.SeekToLast(); |
| ASSERT_TRUE(!iter.Valid()); |
| } |
| |
| TEST_F(InlineSkipTest, InsertAndLookup) { |
| const int N = 2000; |
| const int R = 5000; |
| Random rnd(1000); |
| std::set<Key> keys; |
| ConcurrentArena arena; |
| TestComparator cmp; |
| InlineSkipList<TestComparator> list(cmp, &arena); |
| for (int i = 0; i < N; i++) { |
| Key key = rnd.Next() % R; |
| if (keys.insert(key).second) { |
| char* buf = list.AllocateKey(sizeof(Key)); |
| memcpy(buf, &key, sizeof(Key)); |
| list.Insert(buf); |
| } |
| } |
| |
| for (Key i = 0; i < R; i++) { |
| if (list.Contains(Encode(&i))) { |
| ASSERT_EQ(keys.count(i), 1U); |
| } else { |
| ASSERT_EQ(keys.count(i), 0U); |
| } |
| } |
| |
| // Simple iterator tests |
| { |
| InlineSkipList<TestComparator>::Iterator iter(&list); |
| ASSERT_TRUE(!iter.Valid()); |
| |
| uint64_t zero = 0; |
| iter.Seek(Encode(&zero)); |
| ASSERT_TRUE(iter.Valid()); |
| ASSERT_EQ(*(keys.begin()), Decode(iter.key())); |
| |
| uint64_t max_key = R - 1; |
| iter.SeekForPrev(Encode(&max_key)); |
| ASSERT_TRUE(iter.Valid()); |
| ASSERT_EQ(*(keys.rbegin()), Decode(iter.key())); |
| |
| iter.SeekToFirst(); |
| ASSERT_TRUE(iter.Valid()); |
| ASSERT_EQ(*(keys.begin()), Decode(iter.key())); |
| |
| iter.SeekToLast(); |
| ASSERT_TRUE(iter.Valid()); |
| ASSERT_EQ(*(keys.rbegin()), Decode(iter.key())); |
| } |
| |
| // Forward iteration test |
| for (Key i = 0; i < R; i++) { |
| InlineSkipList<TestComparator>::Iterator iter(&list); |
| iter.Seek(Encode(&i)); |
| |
| // Compare against model iterator |
| std::set<Key>::iterator model_iter = keys.lower_bound(i); |
| for (int j = 0; j < 3; j++) { |
| if (model_iter == keys.end()) { |
| ASSERT_TRUE(!iter.Valid()); |
| break; |
| } else { |
| ASSERT_TRUE(iter.Valid()); |
| ASSERT_EQ(*model_iter, Decode(iter.key())); |
| ++model_iter; |
| iter.Next(); |
| } |
| } |
| } |
| |
| // Backward iteration test |
| for (Key i = 0; i < R; i++) { |
| InlineSkipList<TestComparator>::Iterator iter(&list); |
| iter.SeekForPrev(Encode(&i)); |
| |
| // Compare against model iterator |
| std::set<Key>::iterator model_iter = keys.upper_bound(i); |
| for (int j = 0; j < 3; j++) { |
| if (model_iter == keys.begin()) { |
| ASSERT_TRUE(!iter.Valid()); |
| break; |
| } else { |
| ASSERT_TRUE(iter.Valid()); |
| ASSERT_EQ(*--model_iter, Decode(iter.key())); |
| iter.Prev(); |
| } |
| } |
| } |
| } |
| |
| TEST_F(InlineSkipTest, InsertWithHint_Sequential) { |
| const int N = 100000; |
| Arena arena; |
| TestComparator cmp; |
| TestInlineSkipList list(cmp, &arena); |
| void* hint = nullptr; |
| for (int i = 0; i < N; i++) { |
| Key key = i; |
| InsertWithHint(&list, key, &hint); |
| } |
| Validate(&list); |
| } |
| |
| TEST_F(InlineSkipTest, InsertWithHint_MultipleHints) { |
| const int N = 100000; |
| const int S = 100; |
| Random rnd(534); |
| Arena arena; |
| TestComparator cmp; |
| TestInlineSkipList list(cmp, &arena); |
| void* hints[S]; |
| Key last_key[S]; |
| for (int i = 0; i < S; i++) { |
| hints[i] = nullptr; |
| last_key[i] = 0; |
| } |
| for (int i = 0; i < N; i++) { |
| Key s = rnd.Uniform(S); |
| Key key = (s << 32) + (++last_key[s]); |
| InsertWithHint(&list, key, &hints[s]); |
| } |
| Validate(&list); |
| } |
| |
| TEST_F(InlineSkipTest, InsertWithHint_MultipleHintsRandom) { |
| const int N = 100000; |
| const int S = 100; |
| Random rnd(534); |
| Arena arena; |
| TestComparator cmp; |
| TestInlineSkipList list(cmp, &arena); |
| void* hints[S]; |
| for (int i = 0; i < S; i++) { |
| hints[i] = nullptr; |
| } |
| for (int i = 0; i < N; i++) { |
| Key s = rnd.Uniform(S); |
| Key key = (s << 32) + rnd.Next(); |
| InsertWithHint(&list, key, &hints[s]); |
| } |
| Validate(&list); |
| } |
| |
| TEST_F(InlineSkipTest, InsertWithHint_CompatibleWithInsertWithoutHint) { |
| const int N = 100000; |
| const int S1 = 100; |
| const int S2 = 100; |
| Random rnd(534); |
| Arena arena; |
| TestComparator cmp; |
| TestInlineSkipList list(cmp, &arena); |
| std::unordered_set<Key> used; |
| Key with_hint[S1]; |
| Key without_hint[S2]; |
| void* hints[S1]; |
| for (int i = 0; i < S1; i++) { |
| hints[i] = nullptr; |
| while (true) { |
| Key s = rnd.Next(); |
| if (used.insert(s).second) { |
| with_hint[i] = s; |
| break; |
| } |
| } |
| } |
| for (int i = 0; i < S2; i++) { |
| while (true) { |
| Key s = rnd.Next(); |
| if (used.insert(s).second) { |
| without_hint[i] = s; |
| break; |
| } |
| } |
| } |
| for (int i = 0; i < N; i++) { |
| Key s = rnd.Uniform(S1 + S2); |
| if (s < S1) { |
| Key key = (with_hint[s] << 32) + rnd.Next(); |
| InsertWithHint(&list, key, &hints[s]); |
| } else { |
| Key key = (without_hint[s - S1] << 32) + rnd.Next(); |
| Insert(&list, key); |
| } |
| } |
| Validate(&list); |
| } |
| |
| // We want to make sure that with a single writer and multiple |
| // concurrent readers (with no synchronization other than when a |
| // reader's iterator is created), the reader always observes all the |
| // data that was present in the skip list when the iterator was |
| // constructor. Because insertions are happening concurrently, we may |
| // also observe new values that were inserted since the iterator was |
| // constructed, but we should never miss any values that were present |
| // at iterator construction time. |
| // |
| // We generate multi-part keys: |
| // <key,gen,hash> |
| // where: |
| // key is in range [0..K-1] |
| // gen is a generation number for key |
| // hash is hash(key,gen) |
| // |
| // The insertion code picks a random key, sets gen to be 1 + the last |
| // generation number inserted for that key, and sets hash to Hash(key,gen). |
| // |
| // At the beginning of a read, we snapshot the last inserted |
| // generation number for each key. We then iterate, including random |
| // calls to Next() and Seek(). For every key we encounter, we |
| // check that it is either expected given the initial snapshot or has |
| // been concurrently added since the iterator started. |
| class ConcurrentTest { |
| public: |
| static const uint32_t K = 8; |
| |
| private: |
| static uint64_t key(Key key) { return (key >> 40); } |
| static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } |
| static uint64_t hash(Key key) { return key & 0xff; } |
| |
| static uint64_t HashNumbers(uint64_t k, uint64_t g) { |
| uint64_t data[2] = {k, g}; |
| return Hash(reinterpret_cast<char*>(data), sizeof(data), 0); |
| } |
| |
| static Key MakeKey(uint64_t k, uint64_t g) { |
| assert(sizeof(Key) == sizeof(uint64_t)); |
| assert(k <= K); // We sometimes pass K to seek to the end of the skiplist |
| assert(g <= 0xffffffffu); |
| return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff)); |
| } |
| |
| static bool IsValidKey(Key k) { |
| return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff); |
| } |
| |
| static Key RandomTarget(Random* rnd) { |
| switch (rnd->Next() % 10) { |
| case 0: |
| // Seek to beginning |
| return MakeKey(0, 0); |
| case 1: |
| // Seek to end |
| return MakeKey(K, 0); |
| default: |
| // Seek to middle |
| return MakeKey(rnd->Next() % K, 0); |
| } |
| } |
| |
| // Per-key generation |
| struct State { |
| std::atomic<int> generation[K]; |
| void Set(int k, int v) { |
| generation[k].store(v, std::memory_order_release); |
| } |
| int Get(int k) { return generation[k].load(std::memory_order_acquire); } |
| |
| State() { |
| for (unsigned int k = 0; k < K; k++) { |
| Set(k, 0); |
| } |
| } |
| }; |
| |
| // Current state of the test |
| State current_; |
| |
| ConcurrentArena arena_; |
| |
| // InlineSkipList is not protected by mu_. We just use a single writer |
| // thread to modify it. |
| InlineSkipList<TestComparator> list_; |
| |
| public: |
| ConcurrentTest() : list_(TestComparator(), &arena_) {} |
| |
| // REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep |
| void WriteStep(Random* rnd) { |
| const uint32_t k = rnd->Next() % K; |
| const int g = current_.Get(k) + 1; |
| const Key new_key = MakeKey(k, g); |
| char* buf = list_.AllocateKey(sizeof(Key)); |
| memcpy(buf, &new_key, sizeof(Key)); |
| list_.Insert(buf); |
| current_.Set(k, g); |
| } |
| |
| // REQUIRES: No concurrent calls for the same k |
| void ConcurrentWriteStep(uint32_t k) { |
| const int g = current_.Get(k) + 1; |
| const Key new_key = MakeKey(k, g); |
| char* buf = list_.AllocateKey(sizeof(Key)); |
| memcpy(buf, &new_key, sizeof(Key)); |
| list_.InsertConcurrently(buf); |
| ASSERT_EQ(g, current_.Get(k) + 1); |
| current_.Set(k, g); |
| } |
| |
| void ReadStep(Random* rnd) { |
| // Remember the initial committed state of the skiplist. |
| State initial_state; |
| for (unsigned int k = 0; k < K; k++) { |
| initial_state.Set(k, current_.Get(k)); |
| } |
| |
| Key pos = RandomTarget(rnd); |
| InlineSkipList<TestComparator>::Iterator iter(&list_); |
| iter.Seek(Encode(&pos)); |
| while (true) { |
| Key current; |
| if (!iter.Valid()) { |
| current = MakeKey(K, 0); |
| } else { |
| current = Decode(iter.key()); |
| ASSERT_TRUE(IsValidKey(current)) << current; |
| } |
| ASSERT_LE(pos, current) << "should not go backwards"; |
| |
| // Verify that everything in [pos,current) was not present in |
| // initial_state. |
| while (pos < current) { |
| ASSERT_LT(key(pos), K) << pos; |
| |
| // Note that generation 0 is never inserted, so it is ok if |
| // <*,0,*> is missing. |
| ASSERT_TRUE((gen(pos) == 0U) || |
| (gen(pos) > static_cast<uint64_t>(initial_state.Get( |
| static_cast<int>(key(pos)))))) |
| << "key: " << key(pos) << "; gen: " << gen(pos) |
| << "; initgen: " << initial_state.Get(static_cast<int>(key(pos))); |
| |
| // Advance to next key in the valid key space |
| if (key(pos) < key(current)) { |
| pos = MakeKey(key(pos) + 1, 0); |
| } else { |
| pos = MakeKey(key(pos), gen(pos) + 1); |
| } |
| } |
| |
| if (!iter.Valid()) { |
| break; |
| } |
| |
| if (rnd->Next() % 2) { |
| iter.Next(); |
| pos = MakeKey(key(pos), gen(pos) + 1); |
| } else { |
| Key new_target = RandomTarget(rnd); |
| if (new_target > pos) { |
| pos = new_target; |
| iter.Seek(Encode(&new_target)); |
| } |
| } |
| } |
| } |
| }; |
| const uint32_t ConcurrentTest::K; |
| |
| // Simple test that does single-threaded testing of the ConcurrentTest |
| // scaffolding. |
| TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) { |
| ConcurrentTest test; |
| Random rnd(test::RandomSeed()); |
| for (int i = 0; i < 10000; i++) { |
| test.ReadStep(&rnd); |
| test.WriteStep(&rnd); |
| } |
| } |
| |
| TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) { |
| ConcurrentTest test; |
| Random rnd(test::RandomSeed()); |
| for (int i = 0; i < 10000; i++) { |
| test.ReadStep(&rnd); |
| uint32_t base = rnd.Next(); |
| for (int j = 0; j < 4; ++j) { |
| test.ConcurrentWriteStep((base + j) % ConcurrentTest::K); |
| } |
| } |
| } |
| |
| class TestState { |
| public: |
| ConcurrentTest t_; |
| int seed_; |
| std::atomic<bool> quit_flag_; |
| std::atomic<uint32_t> next_writer_; |
| |
| enum ReaderState { STARTING, RUNNING, DONE }; |
| |
| explicit TestState(int s) |
| : seed_(s), |
| quit_flag_(false), |
| state_(STARTING), |
| pending_writers_(0), |
| state_cv_(&mu_) {} |
| |
| void Wait(ReaderState s) { |
| mu_.Lock(); |
| while (state_ != s) { |
| state_cv_.Wait(); |
| } |
| mu_.Unlock(); |
| } |
| |
| void Change(ReaderState s) { |
| mu_.Lock(); |
| state_ = s; |
| state_cv_.Signal(); |
| mu_.Unlock(); |
| } |
| |
| void AdjustPendingWriters(int delta) { |
| mu_.Lock(); |
| pending_writers_ += delta; |
| if (pending_writers_ == 0) { |
| state_cv_.Signal(); |
| } |
| mu_.Unlock(); |
| } |
| |
| void WaitForPendingWriters() { |
| mu_.Lock(); |
| while (pending_writers_ != 0) { |
| state_cv_.Wait(); |
| } |
| mu_.Unlock(); |
| } |
| |
| private: |
| port::Mutex mu_; |
| ReaderState state_; |
| int pending_writers_; |
| port::CondVar state_cv_; |
| }; |
| |
| static void ConcurrentReader(void* arg) { |
| TestState* state = reinterpret_cast<TestState*>(arg); |
| Random rnd(state->seed_); |
| int64_t reads = 0; |
| state->Change(TestState::RUNNING); |
| while (!state->quit_flag_.load(std::memory_order_acquire)) { |
| state->t_.ReadStep(&rnd); |
| ++reads; |
| } |
| state->Change(TestState::DONE); |
| } |
| |
| static void ConcurrentWriter(void* arg) { |
| TestState* state = reinterpret_cast<TestState*>(arg); |
| uint32_t k = state->next_writer_++ % ConcurrentTest::K; |
| state->t_.ConcurrentWriteStep(k); |
| state->AdjustPendingWriters(-1); |
| } |
| |
| static void RunConcurrentRead(int run) { |
| const int seed = test::RandomSeed() + (run * 100); |
| Random rnd(seed); |
| const int N = 1000; |
| const int kSize = 1000; |
| for (int i = 0; i < N; i++) { |
| if ((i % 100) == 0) { |
| fprintf(stderr, "Run %d of %d\n", i, N); |
| } |
| TestState state(seed + 1); |
| Env::Default()->SetBackgroundThreads(1); |
| Env::Default()->Schedule(ConcurrentReader, &state); |
| state.Wait(TestState::RUNNING); |
| for (int k = 0; k < kSize; ++k) { |
| state.t_.WriteStep(&rnd); |
| } |
| state.quit_flag_.store(true, std::memory_order_release); |
| state.Wait(TestState::DONE); |
| } |
| } |
| |
| static void RunConcurrentInsert(int run, int write_parallelism = 4) { |
| Env::Default()->SetBackgroundThreads(1 + write_parallelism, |
| Env::Priority::LOW); |
| const int seed = test::RandomSeed() + (run * 100); |
| Random rnd(seed); |
| const int N = 1000; |
| const int kSize = 1000; |
| for (int i = 0; i < N; i++) { |
| if ((i % 100) == 0) { |
| fprintf(stderr, "Run %d of %d\n", i, N); |
| } |
| TestState state(seed + 1); |
| Env::Default()->Schedule(ConcurrentReader, &state); |
| state.Wait(TestState::RUNNING); |
| for (int k = 0; k < kSize; k += write_parallelism) { |
| state.next_writer_ = rnd.Next(); |
| state.AdjustPendingWriters(write_parallelism); |
| for (int p = 0; p < write_parallelism; ++p) { |
| Env::Default()->Schedule(ConcurrentWriter, &state); |
| } |
| state.WaitForPendingWriters(); |
| } |
| state.quit_flag_.store(true, std::memory_order_release); |
| state.Wait(TestState::DONE); |
| } |
| } |
| |
| TEST_F(InlineSkipTest, ConcurrentRead1) { RunConcurrentRead(1); } |
| TEST_F(InlineSkipTest, ConcurrentRead2) { RunConcurrentRead(2); } |
| TEST_F(InlineSkipTest, ConcurrentRead3) { RunConcurrentRead(3); } |
| TEST_F(InlineSkipTest, ConcurrentRead4) { RunConcurrentRead(4); } |
| TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); } |
| TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); } |
| TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); } |
| TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); } |
| |
| } // namespace rocksdb |
| |
| int main(int argc, char** argv) { |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |