| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/util/ttl_cache.h" |
| |
| #include <atomic> |
| #include <cstddef> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <memory> |
| #include <set> |
| #include <string> |
| #include <thread> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/gutil/integral_types.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/cache.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| #include "kudu/util/ttl_cache_metrics.h" |
| |
| DECLARE_bool(cache_force_single_shard); |
| DECLARE_double(cache_memtracker_approximation_ratio); |
| |
| using std::atomic; |
| using std::set; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| struct TTLCacheTestMetrics : public TTLCacheMetrics { |
| explicit TTLCacheTestMetrics(const scoped_refptr<MetricEntity>& entity); |
| }; |
| |
| METRIC_DEFINE_counter(server, test_ttl_cache_inserts, |
| "TTL Cache Inserts", |
| kudu::MetricUnit::kEntries, |
| "Number of entries inserted in the cache", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_counter(server, test_ttl_cache_lookups, |
| "TTL Cache Lookups", |
| kudu::MetricUnit::kEntries, |
| "Number of entries looked up from the cache", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_counter(server, test_ttl_cache_evictions, |
| "TTL Cache Evictions", |
| kudu::MetricUnit::kEntries, |
| "Number of entries evicted from the cache", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_counter(server, test_ttl_cache_evictions_expired, |
| "TTL Cache Evictions of Expired Entries", |
| kudu::MetricUnit::kEntries, |
| "Number of entries that had already expired upon " |
| "eviction from the cache", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_counter(server, test_ttl_cache_misses, |
| "TTL Cache Misses", |
| kudu::MetricUnit::kEntries, |
| "Number of lookups that didn't find a cached entry", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_counter(server, test_ttl_cache_hits, |
| "TTL Cache Hits", |
| kudu::MetricUnit::kEntries, |
| "Number of lookups that found a cached entry", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_counter(server, test_ttl_cache_hits_expired, |
| "TTL Cache Hits of Expired Entries", |
| kudu::MetricUnit::kEntries, |
| "Number of lookups that found an entry, but the entry " |
| "had already expired at the time of lookup", |
| kudu::MetricLevel::kInfo); |
| METRIC_DEFINE_gauge_uint64(server, test_ttl_cache_memory_usage, |
| "TTL Cache Memory Usage", |
| kudu::MetricUnit::kBytes, |
| "Memory consumed by the cache", |
| kudu::MetricLevel::kInfo); |
| |
| #define MINIT(member, x) member = METRIC_##x.Instantiate(entity) |
| #define GINIT(member, x) member = METRIC_##x.Instantiate(entity, 0) |
| TTLCacheTestMetrics::TTLCacheTestMetrics( |
| const scoped_refptr<MetricEntity>& entity) { |
| MINIT(inserts, test_ttl_cache_inserts); |
| MINIT(lookups, test_ttl_cache_lookups); |
| MINIT(evictions, test_ttl_cache_evictions); |
| MINIT(evictions_expired, test_ttl_cache_evictions_expired); |
| MINIT(cache_hits_caching, test_ttl_cache_hits); |
| MINIT(cache_hits_expired, test_ttl_cache_hits_expired); |
| MINIT(cache_misses_caching, test_ttl_cache_misses); |
| GINIT(cache_usage, test_ttl_cache_memory_usage); |
| } |
| #undef MINIT |
| #undef GINIT |
| |
| // A class to represent values with constant memory footprint of 1. |
| class TestValue { |
| public: |
| explicit TestValue(int value = 0) |
| : value(value) { |
| } |
| |
| // The 'implicit cast to int' operator for ASSERT_EQ()-like comparisions. |
| // NOLINTNEXTLINE(google-explicit-constructor) |
| operator int() const { |
| return value; |
| } |
| |
| const int value; |
| }; |
| |
| typedef TTLCache<string, TestValue> TTLTestCache; |
| typedef TTLTestCache::EntryHandle Handle; |
| |
| class TTLCacheTest : public KuduTest { |
| public: |
| void SetUp() override { |
| KuduTest::SetUp(); |
| // To simplify the logic of the test related to the capacity of the TTL |
| // cache and the eviction of entries: |
| // * make the cache consisting of a single shard |
| // * stay clear of any approximations while tracking memory consumption |
| FLAGS_cache_force_single_shard = true; |
| FLAGS_cache_memtracker_approximation_ratio = 0; |
| |
| metric_entity_ = METRIC_ENTITY_server.Instantiate(&metric_registry_, |
| "ttl_cache-test"); |
| } |
| |
| // Verify that the cache's entry referenced by 'handle' is non-null and |
| // has the expected value. |
| void VerifyEntryValue(const Handle& handle, int expected_value) { |
| ASSERT_TRUE(handle); |
| ASSERT_EQ(expected_value, handle.value()); |
| } |
| |
| #define GET_GAUGE_READINGS(func_name, counter_name_suffix) \ |
| int64_t func_name() { \ |
| scoped_refptr<Counter> gauge(metric_entity_->FindOrCreateCounter( \ |
| &METRIC_test_ttl_##counter_name_suffix)); \ |
| CHECK(gauge); \ |
| return gauge->value(); \ |
| } |
| GET_GAUGE_READINGS(GetCacheEvictions, cache_evictions) |
| GET_GAUGE_READINGS(GetCacheEvictionsExpired, cache_evictions_expired) |
| GET_GAUGE_READINGS(GetCacheHitsExpired, cache_hits_expired) |
| GET_GAUGE_READINGS(GetCacheHits, cache_hits) |
| GET_GAUGE_READINGS(GetCacheInserts, cache_inserts) |
| GET_GAUGE_READINGS(GetCacheLookups, cache_lookups) |
| GET_GAUGE_READINGS(GetCacheMisses, cache_misses) |
| #undef GET_GAUGE_READINGS |
| |
| int64_t GetCacheUsage() { |
| scoped_refptr<AtomicGauge<uint64>> gauge(metric_entity_->FindOrCreateGauge( |
| &METRIC_test_ttl_cache_memory_usage, static_cast<uint64>(0))); |
| CHECK(gauge); |
| return gauge->value(); |
| } |
| |
| protected: |
| MetricRegistry metric_registry_; |
| scoped_refptr<MetricEntity> metric_entity_; |
| }; |
| |
| // Basic verification on the cache behavior for entries which don't expire |
| // during the test. |
| TEST_F(TTLCacheTest, BasicNoExpiration) { |
| const auto entry_ttl = MonoDelta::FromSeconds(300); |
| |
| TTLTestCache cache(1, entry_ttl); |
| { |
| unique_ptr<TTLCacheTestMetrics> metrics( |
| new TTLCacheTestMetrics(metric_entity_)); |
| cache.SetMetrics(std::move(metrics)); |
| } |
| |
| // Try to fetch an entry from an empty cache. |
| ASSERT_FALSE(cache.Get("key0")); |
| ASSERT_EQ(0, GetCacheHits()); |
| ASSERT_EQ(1, GetCacheLookups()); |
| ASSERT_EQ(1, GetCacheMisses()); |
| ASSERT_EQ(0, GetCacheUsage()); |
| |
| { |
| // Verify how Put() works. |
| auto put_h(cache.Put("key0", unique_ptr<TestValue>(new TestValue), 1)); |
| NO_FATALS(VerifyEntryValue(put_h, 0)); |
| |
| { |
| // Once put in the cache and not yet evicted, the entry must be available |
| // for retrieval from the cache. |
| auto h(cache.Get("key0")); |
| NO_FATALS(VerifyEntryValue(h, 0);); |
| ASSERT_EQ(2, GetCacheLookups()); |
| ASSERT_EQ(1, GetCacheHits()); |
| // That's the same entry. |
| ASSERT_EQ(&put_h.value(), &h.value()); |
| } |
| |
| // One more entry is added, but since the cache was at capacity ... |
| auto h1 = cache.Put("key1", unique_ptr<TestValue>(new TestValue(1)), 1); |
| // However, since the 'put_h' handle still references the entry, it should |
| // not be freed yet. |
| ASSERT_EQ(0, GetCacheEvictions()); |
| NO_FATALS(VerifyEntryValue(h1, 1)); |
| // .. the formerly cached entry should no longer be available. |
| ASSERT_FALSE(cache.Get("key0")); |
| // One extra miss should be registered. |
| ASSERT_EQ(2, GetCacheMisses()); |
| // No new hits. |
| ASSERT_EQ(1, GetCacheHits()); |
| |
| // However, since the reference to the entry is still present in the scope, |
| // it's still available via that reference (but not from the cache as is). |
| NO_FATALS(VerifyEntryValue(put_h, 0)); |
| ASSERT_EQ(2, GetCacheUsage()); |
| } |
| // Once 'put_h' handle is out of scope, the earlier removed entry should be |
| // freed. |
| ASSERT_EQ(1, GetCacheEvictions()); |
| ASSERT_EQ(1, GetCacheUsage()); |
| |
| // Have multiple handles for the same entry in the cache in one scope |
| // and make sure the reference counting in the underlying cache works |
| // as expected. |
| { |
| auto put_h = cache.Put("k", unique_ptr<TestValue>(new TestValue(5)), 1); |
| ASSERT_EQ(3, GetCacheInserts()); |
| // The entry keyed with 'key1' should be evicted and freed after new |
| // insertion. |
| ASSERT_EQ(2, GetCacheEvictions()); |
| // Nothing except for this newly inserted entry is references elsewhere. |
| ASSERT_EQ(1, GetCacheUsage()); |
| |
| NO_FATALS(VerifyEntryValue(put_h, 5)); |
| |
| auto get_h0 = cache.Get("k"); |
| // That's the same entry. |
| ASSERT_EQ(&put_h.value(), &get_h0.value()); |
| NO_FATALS(VerifyEntryValue(get_h0, 5)); |
| |
| auto get_h1 = cache.Get("k"); |
| // That's the same entry. |
| ASSERT_EQ(&get_h0.value(), &get_h1.value()); |
| NO_FATALS(VerifyEntryValue(get_h1, 5)); |
| } |
| |
| // No hits of expired entries. |
| ASSERT_EQ(0, GetCacheHitsExpired()); |
| // No evictions of expired entries: all the evicted entries were valid. |
| ASSERT_EQ(0, GetCacheEvictionsExpired()); |
| // The cache evicts entries in a lazy manner. |
| ASSERT_EQ(2, GetCacheEvictions()); |
| ASSERT_EQ(1, GetCacheUsage()); |
| } |
| |
| // Verify the cache's eviction policy. After their expiration time, entries |
| // should not be available from the cache. Also, the earliest added, even if |
| // not-yet-expired, entries are evicted to accomodate new ones if the cache is |
| // at capacity. |
| TEST_F(TTLCacheTest, Basic) { |
| constexpr size_t cache_capacity = 5; |
| const auto entry_ttl = MonoDelta::FromMilliseconds(100); |
| |
| TTLTestCache cache(cache_capacity, entry_ttl); |
| { |
| unique_ptr<TTLCacheTestMetrics> metrics( |
| new TTLCacheTestMetrics(metric_entity_)); |
| cache.SetMetrics(std::move(metrics)); |
| } |
| |
| // At an empty cache, all gauges should read zero. |
| ASSERT_EQ(0, GetCacheEvictions()); |
| ASSERT_EQ(0, GetCacheEvictionsExpired()); |
| ASSERT_EQ(0, GetCacheHits()); |
| ASSERT_EQ(0, GetCacheHitsExpired()); |
| ASSERT_EQ(0, GetCacheInserts()); |
| ASSERT_EQ(0, GetCacheLookups()); |
| ASSERT_EQ(0, GetCacheMisses()); |
| ASSERT_EQ(0, GetCacheUsage()); |
| |
| auto put_h = cache.Put("key0", unique_ptr<TestValue>(new TestValue(100)), 1); |
| ASSERT_EQ(1, GetCacheInserts()); |
| ASSERT_EQ(1, GetCacheUsage()); |
| NO_FATALS(VerifyEntryValue(put_h, 100)); |
| // Using the handle from the Put() operation, no lookups so far. |
| ASSERT_EQ(0, GetCacheLookups()); |
| ASSERT_EQ(0, GetCacheMisses()); |
| |
| SleepFor(entry_ttl); |
| { |
| // The entry is expired and must not be available from the cache, but |
| // it should not yet be freed since there is a handle that refers to it. |
| ASSERT_FALSE(cache.Get("key0")); |
| ASSERT_EQ(1, GetCacheLookups()); |
| // The expired entry was in the cache, even if it was not returned as |
| // the result. |
| ASSERT_EQ(0, GetCacheMisses()); |
| ASSERT_EQ(1, GetCacheHits()); |
| // The cache_hits_expired counter reflects the fact that the expired |
| // entry was not returned from the cache as the result of the query. |
| ASSERT_EQ(1, GetCacheHitsExpired()); |
| ASSERT_EQ(0, GetCacheEvictionsExpired()); |
| |
| // However, the entry should be still available via the reference |
| // that is kept in the scope. |
| NO_FATALS(VerifyEntryValue(put_h, 100)); |
| } |
| |
| { |
| // Re-insert the entry with the same key, but different value. |
| auto h = cache.Put("key0", unique_ptr<TestValue>(new TestValue(200)), 1); |
| NO_FATALS(VerifyEntryValue(h, 200)); |
| ASSERT_EQ(2, GetCacheInserts()); |
| ASSERT_EQ(1, GetCacheLookups()); |
| ASSERT_EQ(2, GetCacheUsage()); |
| |
| // That operation should not affect the reference to the entry inserted |
| // previously. |
| NO_FATALS(VerifyEntryValue(put_h, 100)); |
| } |
| |
| for (auto i = 0; i < cache_capacity; ++i) { |
| const auto key = Substitute("1key$0", i); |
| unique_ptr<TestValue> val(new TestValue(i)); |
| cache.Put(key, std::move(val), 1); |
| |
| auto h = cache.Get(key); |
| NO_FATALS(VerifyEntryValue(h, i)); |
| } |
| ASSERT_EQ(cache_capacity + 2, GetCacheInserts()); |
| ASSERT_EQ(cache_capacity + 1, GetCacheLookups()); |
| // All recent lookups should result in cache hits. |
| ASSERT_EQ(cache_capacity + 1, GetCacheHits()); |
| ASSERT_EQ(0, GetCacheMisses()); |
| // The re-inserted entry for 'key0' should be gone: nothing is referencing it. |
| ASSERT_EQ(1, GetCacheEvictions()); |
| ASSERT_EQ(0, GetCacheEvictionsExpired()); |
| // The cache is at capacity: all fresh inserted entries plus the entry |
| // removed from the cache, but still referenced by 'put_h' handle. |
| ASSERT_EQ(cache_capacity + 1, GetCacheUsage()); |
| |
| // The earliest added entry should be removed from the cache at this point. |
| ASSERT_FALSE(cache.Get("key0")); |
| ASSERT_EQ(cache_capacity + 2, GetCacheLookups()); |
| ASSERT_EQ(1, GetCacheMisses()); |
| |
| // Lately inserted entries should be still around ... |
| for (auto i = 0; i < cache_capacity; ++i) { |
| const auto key = Substitute("1key$0", i); |
| auto h = cache.Get(key); |
| NO_FATALS(VerifyEntryValue(h, i)); |
| } |
| ASSERT_EQ(cache_capacity * 2 + 2, GetCacheLookups()); |
| // No new misses so far. |
| ASSERT_EQ(1, GetCacheMisses()); |
| |
| SleepFor(entry_ttl); |
| // ... and after expiration they should be gone. |
| for (auto i = 0; i < cache_capacity + 1; ++i) { |
| const auto key = Substitute("1key$0", i); |
| ASSERT_FALSE(cache.Get(key)); |
| } |
| ASSERT_EQ(cache_capacity + 1, GetCacheHitsExpired()); |
| // All the entries are expired but should not be freed yet: the very first one |
| // is still referenced by the 'put_h' handle, and the rest haven't spilled |
| // over the cache's capacity. |
| ASSERT_EQ(0, GetCacheEvictionsExpired()); |
| // One new miss for the absent key Substitute("1key$0", capacity) |
| ASSERT_EQ(2, GetCacheMisses()); |
| |
| // Insert double of the capacity of the queue. |
| for (auto i = 0; i < 2 * cache_capacity; ++i) { |
| const auto key = Substitute("2key$0", i); |
| unique_ptr<TestValue> val(new TestValue(i)); |
| cache.Put(key, std::move(val), 1); |
| } |
| // (capacity * 2) new evictions should have happened. |
| ASSERT_EQ(cache_capacity * 2 + 1, GetCacheEvictions()); |
| // Only one extra entry is still referenced. |
| ASSERT_EQ(cache_capacity + 1, GetCacheUsage()); |
| |
| // The first half of the fresnly inserted entries should not be available. |
| for (auto i = 0; i < cache_capacity; ++i) { |
| const auto key = Substitute("2key$0", i); |
| ASSERT_FALSE(cache.Get(key)); |
| } |
| // All recent lookups should result in misses. |
| ASSERT_EQ(cache_capacity + 2, GetCacheMisses()); |
| ASSERT_EQ(cache_capacity + 1, GetCacheHitsExpired()); |
| // All the expired entries except for the very first inserted must be freed: |
| // the very first is still referenced by the 'put_h' handle, |
| // but others don't have any references. |
| ASSERT_EQ(cache_capacity, GetCacheEvictionsExpired()); |
| |
| // The second half of the entries should be still present. |
| for (auto i = cache_capacity; i < 2 * cache_capacity; ++i) { |
| const auto key = Substitute("2key$0", i); |
| auto h = cache.Get(key); |
| NO_FATALS(VerifyEntryValue(h, i)); |
| } |
| // Correspondingly, there should be no new misses. |
| ASSERT_EQ(cache_capacity + 2, GetCacheMisses()); |
| |
| // After all the shenanigans, an entry should be still available |
| // via the reference kept in the scope. |
| NO_FATALS(VerifyEntryValue(put_h, 100)); |
| ASSERT_EQ(cache_capacity + 1, GetCacheUsage()); |
| } |
| |
| // Test the invalidation of expired entries in the underlying cache. |
| TEST_F(TTLCacheTest, InvalidationOfExpiredEntries) { |
| constexpr size_t cache_capacity = 512; |
| const auto entry_ttl = MonoDelta::FromMilliseconds(250); |
| |
| TTLTestCache cache(cache_capacity, entry_ttl); |
| { |
| unique_ptr<TTLCacheTestMetrics> metrics( |
| new TTLCacheTestMetrics(metric_entity_)); |
| cache.SetMetrics(std::move(metrics)); |
| } |
| |
| const Cache::InvalidationControl ctl = { |
| [&](Slice /* key */, Slice value) { |
| CHECK_EQ(sizeof(TTLTestCache::Entry), value.size()); |
| const auto* entry = reinterpret_cast<const TTLTestCache::Entry*>( |
| value.data()); |
| return (entry->exp_time > MonoTime::Now()); |
| }, |
| [&](size_t valid_entry_count, size_t /* invalid_entry_count */) { |
| return valid_entry_count == 0; |
| } |
| }; |
| ASSERT_EQ(0, cache.cache_->Invalidate(ctl)); |
| |
| for (auto i = 0; i < cache_capacity / 2; ++i) { |
| cache.Put(Substitute("0$0", i), unique_ptr<TestValue>(new TestValue), 1); |
| } |
| ASSERT_EQ(0, cache.cache_->Invalidate(ctl)); |
| ASSERT_EQ(0, GetCacheEvictionsExpired()); |
| |
| SleepFor(entry_ttl); |
| |
| ASSERT_EQ(cache_capacity / 2, cache.cache_->Invalidate(ctl)); |
| ASSERT_EQ(cache_capacity / 2, GetCacheEvictionsExpired()); |
| ASSERT_EQ(0, GetCacheUsage()); |
| |
| for (auto i = 0; i < cache_capacity / 2; ++i) { |
| cache.Put(Substitute("1$0", i), unique_ptr<TestValue>(new TestValue), 1); |
| } |
| SleepFor(entry_ttl); |
| for (auto i = 0; i < cache_capacity / 2; ++i) { |
| cache.Put(Substitute("2$0", i), unique_ptr<TestValue>(new TestValue), 1); |
| } |
| ASSERT_EQ(cache_capacity / 2, cache.cache_->Invalidate(ctl)); |
| ASSERT_EQ(cache_capacity, GetCacheEvictionsExpired()); |
| ASSERT_EQ(cache_capacity / 2, GetCacheUsage()); |
| } |
| |
| // Verify the auto-invalidation of expired entries in TTLCache work as expected: |
| // expired entries are removed with the expecting timing and valid entries stay. |
| TEST_F(TTLCacheTest, AutoInvalidationOfEntries) { |
| constexpr size_t kCacheCapacity = 128; |
| constexpr size_t kHalfCacheCapacity = kCacheCapacity / 2; |
| const auto kEntryTtl = MonoDelta::FromMilliseconds(256); |
| const auto kScrubInterval = MonoDelta::FromMilliseconds( |
| kEntryTtl.ToMilliseconds() / 8); |
| const auto kEntryTtlThreeQuarters = MonoDelta::FromMilliseconds( |
| kEntryTtl.ToMilliseconds() * 3 / 4); |
| |
| TTLTestCache cache(kCacheCapacity, kEntryTtl, kScrubInterval); |
| { |
| unique_ptr<TTLCacheTestMetrics> metrics( |
| new TTLCacheTestMetrics(metric_entity_)); |
| cache.SetMetrics(std::move(metrics)); |
| } |
| |
| // Make sure the scrubber doesn't conflate non-expired entries with expired |
| // ones while scrubbing the cache. |
| for (auto i = 0; i < kHalfCacheCapacity; ++i) { |
| cache.Put(Substitute("1$0", i), unique_ptr<TestValue>(new TestValue(i)), 1); |
| } |
| SleepFor(kEntryTtlThreeQuarters); |
| for (auto i = 0; i < kHalfCacheCapacity; ++i) { |
| cache.Put(Substitute("2$0", i), unique_ptr<TestValue>(new TestValue(i)), 1); |
| } |
| ASSERT_EQ(kCacheCapacity, GetCacheUsage()); |
| SleepFor(kEntryTtlThreeQuarters); |
| |
| // Apart from the OS scheduler's anomalies, the scrubbing thread should run |
| // at least once at this point. Given the timing of adding the entries into |
| // the cache, half of the entries in the cache should have been invalidated, |
| // while the other half should stay. |
| ASSERT_EQ(kHalfCacheCapacity, GetCacheEvictionsExpired()); |
| ASSERT_EQ(kHalfCacheCapacity, GetCacheUsage()); |
| |
| // Eventually, nothing should be in the cache. |
| ASSERT_EVENTUALLY([&]{ |
| ASSERT_EQ(0, GetCacheUsage()); |
| ASSERT_EQ(kCacheCapacity, GetCacheEvictionsExpired()); |
| }); |
| } |
| |
| // Verify the auto-invalidation of expired entries in TTLCache work as |
| // expected in the presence of oustanding handles to the entries which |
| // are subject to invalidation by the scrubbing thread. |
| TEST_F(TTLCacheTest, AutoInvalidationOfEntriesWithOutstandingReferences) { |
| constexpr size_t kCacheCapacity = 512; |
| constexpr size_t kHalfCacheCapacity = kCacheCapacity / 2; |
| const auto kEntryTtl = MonoDelta::FromMilliseconds(50); |
| const auto kScrubInterval = MonoDelta::FromMilliseconds( |
| kEntryTtl.ToMilliseconds() / 2); |
| |
| // A TTL cache with auto-scrubbing thread that gets rid of expired entries |
| // with the frequency corresponding to a half of the TTL interval. |
| TTLTestCache cache(kCacheCapacity, kEntryTtl, kScrubInterval); |
| { |
| unique_ptr<TTLCacheTestMetrics> metrics( |
| new TTLCacheTestMetrics(metric_entity_)); |
| cache.SetMetrics(std::move(metrics)); |
| } |
| |
| { |
| const set<string> selected_keys = { |
| "0", "2", "9", "99", "127", "128", "252", "255", }; |
| vector<Handle> selected_handles; |
| selected_handles.reserve(selected_keys.size()); |
| for (auto i = 0; i < kHalfCacheCapacity; ++i) { |
| const auto key = Substitute("$0", i); |
| auto h = cache.Put(key, unique_ptr<TestValue>(new TestValue), 1); |
| if (ContainsKey(selected_keys, key)) { |
| selected_handles.emplace_back(std::move(h)); |
| } |
| } |
| ASSERT_EQ(selected_keys.size(), selected_handles.size()); |
| |
| // All the entries except for the ones selected should be gone: |
| // the background thread should get rid all the expired entries. |
| // The 'selected_handles' still keep references to the selected entries, |
| // so those should not be deallocated yet. |
| ASSERT_EVENTUALLY([&]{ |
| ASSERT_EQ(kHalfCacheCapacity - selected_keys.size(), |
| GetCacheEvictionsExpired()); |
| ASSERT_EQ(selected_keys.size(), GetCacheUsage()); |
| }); |
| |
| // However, even if the selected entries are not yet deallocated, |
| // they must have been removed from the cache's lookup table |
| // once the scrubbing thread invalidated them. |
| for (const auto& key : selected_keys) { |
| ASSERT_FALSE(cache.Get(key)); |
| } |
| } |
| // Now, once the handles for the selected entries are gone out of scope, |
| // the cache should be empty. |
| ASSERT_EQ(kHalfCacheCapacity, GetCacheEvictionsExpired()); |
| ASSERT_EQ(0, GetCacheUsage()); |
| } |
| |
| // Verify how the auto-invalidation of expired entries work in the presence |
| // of concurrent read access to the entries. Also, verify that the limit on the |
| // number of maximum number of processed entries per one run of the scrubbing |
| // thread applies as expected. |
| TEST_F(TTLCacheTest, AutoInvalidationOfEntriesLimitPerPass) { |
| constexpr size_t kCacheCapacity = 1024; |
| constexpr size_t kScrubMaxEntriesPerPass = 64; |
| const auto kEntryTtl = MonoDelta::FromMilliseconds(100); |
| const auto kScrubInterval = kEntryTtl; |
| const auto kNumRunnerThreads = 64; |
| |
| // A TTL cache with auto-scrubbing thread that gets rid of expired entries. |
| // The amount of scrubbed entries per pass is limited. |
| TTLTestCache cache(kCacheCapacity, kEntryTtl, |
| kScrubInterval, kScrubMaxEntriesPerPass); |
| { |
| unique_ptr<TTLCacheTestMetrics> metrics( |
| new TTLCacheTestMetrics(metric_entity_)); |
| cache.SetMetrics(std::move(metrics)); |
| } |
| |
| atomic<bool> stop(false); |
| vector<thread> threads; |
| SCOPED_CLEANUP({ |
| stop = true; |
| for (auto& thread : threads) { |
| thread.join(); |
| } |
| }); |
| |
| for (auto i = 0; i < kNumRunnerThreads; ++i) { |
| threads.emplace_back([&cache, &stop] () { |
| while (!stop) { |
| const auto key = std::to_string(rand() % kCacheCapacity); |
| auto h = cache.Get(key); |
| // Keep the handle around for some time. |
| SleepFor(MonoDelta::FromNanoseconds(rand() % 5)); |
| } |
| }); |
| } |
| |
| const auto start_time = MonoTime::Now(); |
| for (auto i = 0; i < kCacheCapacity; ++i) { |
| cache.Put(std::to_string(i), unique_ptr<TestValue>(new TestValue(i)), 1); |
| } |
| |
| const auto scrub_interval_ms = kScrubInterval.ToMilliseconds(); |
| for (auto i = 0; i < kCacheCapacity / kScrubMaxEntriesPerPass; ++i) { |
| const auto evictions_expired_num = GetCacheEvictionsExpired(); |
| const auto duration_ms = (MonoTime::Now() - start_time).ToMilliseconds(); |
| const auto runs_num = 1 + |
| (duration_ms + scrub_interval_ms - 1) / scrub_interval_ms; |
| // No more than the specified number of entries should be evicted per one |
| // pass of the scrubbing thread. |
| ASSERT_LE(evictions_expired_num, runs_num * kScrubMaxEntriesPerPass); |
| SleepFor(kScrubInterval); |
| } |
| stop = true; |
| |
| ASSERT_EVENTUALLY([&]{ |
| ASSERT_EQ(kCacheCapacity, GetCacheEvictionsExpired()); |
| ASSERT_EQ(0, GetCacheUsage()); |
| }); |
| } |
| |
| } // namespace kudu |