| /* |
| * Copyright 2012 Google Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| // Author: jmarantz@google.com (Joshua Marantz) |
| |
| // Unit-test CacheBatcher, using LRUCache, AsyncCache, and DelayCache. |
| |
| #include "pagespeed/kernel/cache/cache_batcher.h" |
| |
| #include <cstddef> |
| |
| #include "pagespeed/kernel/base/gtest.h" |
| #include "pagespeed/kernel/base/scoped_ptr.h" |
| #include "pagespeed/kernel/base/statistics.h" |
| #include "pagespeed/kernel/base/thread_system.h" |
| #include "pagespeed/kernel/base/timer.h" |
| #include "pagespeed/kernel/cache/async_cache.h" |
| #include "pagespeed/kernel/cache/cache_test_base.h" |
| #include "pagespeed/kernel/cache/delay_cache.h" |
| #include "pagespeed/kernel/cache/lru_cache.h" |
| #include "pagespeed/kernel/cache/threadsafe_cache.h" |
| #include "pagespeed/kernel/thread/queued_worker_pool.h" |
| #include "pagespeed/kernel/thread/worker_test_base.h" |
| #include "pagespeed/kernel/util/platform.h" |
| #include "pagespeed/kernel/util/simple_stats.h" |
| |
| namespace { |
| const size_t kMaxSize = 100; |
| const int kMaxWorkers = 2; |
| } |
| |
| namespace net_instaweb { |
| |
| class CacheBatcherTest : public CacheTestBase { |
| protected: |
| CacheBatcherTest() : expected_pending_(0) { |
| thread_system_.reset(Platform::CreateThreadSystem()); |
| statistics_.reset(new SimpleStats(thread_system_.get())); |
| CacheBatcher::InitStats(statistics_.get()); |
| lru_cache_.reset(new LRUCache(kMaxSize)); |
| timer_.reset(thread_system_->NewTimer()); |
| pool_.reset( |
| new QueuedWorkerPool(kMaxWorkers, "cache", thread_system_.get())); |
| threadsafe_cache_.reset(new ThreadsafeCache( |
| lru_cache_.get(), thread_system_->NewMutex())); |
| async_cache_.reset(new AsyncCache(threadsafe_cache_.get(), pool_.get())); |
| delay_cache_.reset(new DelayCache(async_cache_.get(), |
| thread_system_.get())); |
| batcher_.reset(new CacheBatcher(delay_cache_.get(), |
| thread_system_->NewMutex(), |
| statistics_.get())); |
| set_mutex(thread_system_->NewMutex()); |
| } |
| |
| // Make sure we shut down the worker pool prior to destructing AsyncCache. |
| virtual void TearDown() { |
| pool_->ShutDown(); |
| } |
| |
| class SyncPointCallback : public CacheTestBase::Callback { |
| public: |
| explicit SyncPointCallback(CacheBatcherTest* test) |
| : Callback(test), |
| sync_point_(test->thread_system_.get()) { |
| } |
| |
| virtual void Done(CacheInterface::KeyState state) { |
| Callback::Done(state); |
| sync_point_.Notify(); |
| } |
| |
| virtual void Wait() { sync_point_.Wait(); } |
| |
| private: |
| WorkerTestBase::SyncPoint sync_point_; |
| }; |
| |
| virtual CacheInterface* Cache() { return batcher_.get(); } |
| virtual Callback* NewCallback() { return new SyncPointCallback(this); } |
| |
| // After the Done() callback is be called, there is a slight delay |
| // in the worker thread before the CacheBatcher knows it can |
| // schedule another lookup. To test the sequences we want, wait |
| // till the batcher catches up with our expectations. |
| virtual void PostOpCleanup() { |
| while ((batcher_->Pending() != expected_pending_) || |
| (async_cache_->outstanding_operations() != 0)) { |
| timer_->SleepMs(1); |
| } |
| } |
| |
| void DelayKey(const GoogleString& key) { |
| delay_cache_->DelayKey(key); |
| ++expected_pending_; |
| } |
| |
| void ReleaseKey(const GoogleString& key) { |
| delay_cache_->ReleaseKey(key); |
| --expected_pending_; |
| } |
| |
| scoped_ptr<LRUCache> lru_cache_; |
| scoped_ptr<ThreadSystem> thread_system_; |
| scoped_ptr<ThreadsafeCache> threadsafe_cache_; |
| scoped_ptr<Timer> timer_; |
| scoped_ptr<QueuedWorkerPool> pool_; |
| scoped_ptr<AsyncCache> async_cache_; |
| scoped_ptr<DelayCache> delay_cache_; |
| scoped_ptr<SimpleStats> statistics_; |
| scoped_ptr<CacheBatcher> batcher_; |
| int expected_pending_; |
| }; |
| |
| // In this version, no keys are delayed, so the batcher has no opportunity |
| // to batch. This test is copied from lru_cache_test.cc. Note that we are |
| // going through the CacheBatcher/Delay/AsyncCache/ThreadsafeCache but the |
| // LRUCache should be quiescent every time we look directly at it. |
| TEST_F(CacheBatcherTest, PutGetDelete) { |
| EXPECT_EQ(static_cast<size_t>(0), lru_cache_->size_bytes()); |
| EXPECT_EQ(static_cast<size_t>(0), lru_cache_->num_elements()); |
| CheckPut("Name", "Value"); |
| CheckGet("Name", "Value"); |
| EXPECT_EQ(static_cast<size_t>(9), lru_cache_->size_bytes()); |
| EXPECT_EQ(static_cast<size_t>(1), lru_cache_->num_elements()); |
| CheckNotFound("Another Name"); |
| |
| CheckPut("Name", "NewValue"); |
| CheckGet("Name", "NewValue"); |
| EXPECT_EQ(static_cast<size_t>(12), lru_cache_->size_bytes()); |
| EXPECT_EQ(static_cast<size_t>(1), lru_cache_->num_elements()); |
| |
| CheckDelete("Name"); |
| lru_cache_->SanityCheck(); |
| CheckNotFound("Name"); |
| EXPECT_EQ(static_cast<size_t>(0), lru_cache_->size_bytes()); |
| EXPECT_EQ(static_cast<size_t>(0), lru_cache_->num_elements()); |
| lru_cache_->SanityCheck(); |
| } |
| |
| TEST_F(CacheBatcherTest, DelayN0NoParallelism) { |
| batcher_->set_max_parallel_lookups(1); |
| |
| PopulateCache(4); |
| |
| // Delaying "n0" causes the fetches for "n1" and "n2" to be batched in |
| // CacheBatcher. They can be executed once "n0" is released. |
| DelayKey("n0"); |
| Callback* n0 = InitiateGet("n0"); |
| EXPECT_EQ(1, outstanding_fetches()); |
| Callback* n1 = InitiateGet("n1"); |
| Callback* not_found = InitiateGet("not found"); |
| EXPECT_EQ(3, outstanding_fetches()); |
| Callback* n2 = InitiateGet("n2"); |
| EXPECT_EQ(4, outstanding_fetches()); |
| |
| ReleaseKey("n0"); |
| WaitAndCheck(n0, "v0"); |
| WaitAndCheck(n1, "v1"); |
| WaitAndCheck(n2, "v2"); |
| WaitAndCheckNotFound(not_found); |
| |
| // outstanding_fetches() won't be stable to look at until all 3 callback Waits |
| // are called. |
| EXPECT_EQ(0, outstanding_fetches()); |
| EXPECT_EQ(3, batcher_->last_batch_size()); |
| |
| // Further fetches will execute immediately again. |
| CheckGet("n3", "v3"); |
| } |
| |
| TEST_F(CacheBatcherTest, DelayN0TwoWayParallelism) { |
| batcher_->set_max_parallel_lookups(2); |
| |
| PopulateCache(8); |
| |
| DelayKey("n0"); |
| Callback* n0 = InitiateGet("n0"); |
| EXPECT_EQ(1, outstanding_fetches()); |
| |
| // We still have some parallelism available to us, so "n1" and "n2" will |
| // complete even while "n0" is outstanding. |
| CheckGet("n1", "v1"); |
| CheckGet("n2", "v2"); |
| ASSERT_EQ(1, batcher_->Pending()); |
| |
| // Now block "n3" and look it up. n4 and n5 will now be delayed and batched. |
| DelayKey("n3"); |
| Callback* n3 = InitiateGet("n3"); |
| Callback* not_found = InitiateGet("not found"); |
| Callback* n4 = InitiateGet("n4"); |
| EXPECT_EQ(4, outstanding_fetches()); // n0, n3, "not found", n4 |
| Callback* n5 = InitiateGet("n5"); |
| EXPECT_EQ(5, outstanding_fetches()); |
| |
| // Releasing n0 frees a thread and now n4 and n5 can be completed. |
| ReleaseKey("n0"); |
| WaitAndCheck(n0, "v0"); |
| WaitAndCheckNotFound(not_found); |
| WaitAndCheck(n4, "v4"); |
| WaitAndCheck(n5, "v5"); |
| EXPECT_EQ(1, outstanding_fetches()); |
| EXPECT_EQ(3, batcher_->last_batch_size()); |
| |
| // Finally, release n3 and we are all clean. |
| ReleaseKey("n3"); |
| WaitAndCheck(n3, "v3"); |
| } |
| |
| TEST_F(CacheBatcherTest, ExceedMaxQueueAndDrop) { |
| batcher_->set_max_parallel_lookups(1); |
| batcher_->set_max_queue_size(3); |
| |
| PopulateCache(5); |
| |
| // Delaying "n0" causes the fetches for "n1" and "n2" to be batched in |
| // CacheBatcher. They can be executed once "n0" is released. |
| DelayKey("n0"); |
| Callback* n0 = InitiateGet("n0"); |
| EXPECT_EQ(1, outstanding_fetches()); |
| Callback* n1 = InitiateGet("n1"); |
| Callback* not_found = InitiateGet("not found"); |
| EXPECT_EQ(3, outstanding_fetches()); |
| Callback* n2 = InitiateGet("n2"); |
| EXPECT_EQ(4, outstanding_fetches()); |
| Callback* n3 = InitiateGet("n3"); // This will be dropped immediately and |
| WaitAndCheckNotFound(n3); // reported as not found. |
| EXPECT_EQ(1, statistics_->GetVariable("cache_batcher_dropped_gets")->Get()); |
| |
| ReleaseKey("n0"); |
| WaitAndCheck(n0, "v0"); |
| WaitAndCheck(n1, "v1"); |
| WaitAndCheckNotFound(not_found); |
| WaitAndCheck(n2, "v2"); |
| |
| // outstanding_fetches() won't be stable to look at until all 3 callback Waits |
| // are called. |
| EXPECT_EQ(0, outstanding_fetches()); |
| EXPECT_EQ(3, batcher_->last_batch_size()); |
| |
| // Further fetches will execute immediately again. |
| CheckGet("n4", "v4"); |
| } |
| |
| } // namespace net_instaweb |