| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/util/blocking_queue.h" |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <cstdint> |
| #include <deque> |
| #include <map> |
| #include <memory> |
| #include <numeric> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <type_traits> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/mutex.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DEFINE_uint32(num_blocking_writers, 3, |
| "number of threads calling BlockingQueue::BlockingPut()"); |
| DEFINE_uint32(num_non_blocking_writers, 2, |
| "number of threads calling BlockingQueue::Put()"); |
| DEFINE_uint32(num_blocking_readers, 5, |
| "number of threads calling BlockingQueue::BlockingGet()"); |
| DEFINE_uint32(runtime_sec, 5, "duration of the test (seconds)"); |
| DEFINE_uint32(queue_capacity, 64, "capacity of the queue (number of elements)"); |
| |
| using std::accumulate; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| BlockingQueue<int32_t> test1_queue(5); |
| |
| void InsertSomeThings() { |
| ASSERT_EQ(test1_queue.Put(1), QUEUE_SUCCESS); |
| ASSERT_EQ(test1_queue.Put(2), QUEUE_SUCCESS); |
| ASSERT_EQ(test1_queue.Put(3), QUEUE_SUCCESS); |
| } |
| |
| TEST(BlockingQueueTest, Test1) { |
| thread inserter_thread(InsertSomeThings); |
| int32_t i; |
| ASSERT_OK(test1_queue.BlockingGet(&i)); |
| ASSERT_EQ(1, i); |
| ASSERT_OK(test1_queue.BlockingGet(&i)); |
| ASSERT_EQ(2, i); |
| ASSERT_OK(test1_queue.BlockingGet(&i)); |
| ASSERT_EQ(3, i); |
| inserter_thread.join(); |
| } |
| |
| TEST(BlockingQueueTest, TestBlockingDrainTo) { |
| BlockingQueue<int32_t> test_queue(3); |
| ASSERT_EQ(test_queue.Put(1), QUEUE_SUCCESS); |
| ASSERT_EQ(test_queue.Put(2), QUEUE_SUCCESS); |
| ASSERT_EQ(test_queue.Put(3), QUEUE_SUCCESS); |
| vector<int32_t> out; |
| ASSERT_OK(test_queue.BlockingDrainTo(&out, MonoTime::Now() + MonoDelta::FromSeconds(30))); |
| ASSERT_EQ(1, out[0]); |
| ASSERT_EQ(2, out[1]); |
| ASSERT_EQ(3, out[2]); |
| |
| // Set a deadline in the past and ensure we time out. |
| Status s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1)); |
| ASSERT_TRUE(s.IsTimedOut()); |
| |
| // Ensure that if the queue is shut down, we get Aborted status. |
| test_queue.Shutdown(); |
| s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1)); |
| ASSERT_TRUE(s.IsAborted()); |
| } |
| |
| TEST(BlockingQueueTest, TestBlockingPut) { |
| const MonoDelta kShortTimeout = MonoDelta::FromMilliseconds(200); |
| const MonoDelta kEvenShorterTimeout = MonoDelta::FromMilliseconds(100); |
| BlockingQueue<int32_t> test_queue(2); |
| |
| // First, a trivial check that we don't do anything if our deadline has |
| // already passed. |
| Status s = test_queue.BlockingPut(1, MonoTime::Now() - kShortTimeout); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| |
| // Now put a couple elements onto the queue. |
| ASSERT_OK(test_queue.BlockingPut(1)); |
| ASSERT_OK(test_queue.BlockingPut(2)); |
| |
| // We're at capacity, so further puts should time out... |
| s = test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| |
| // ... until space is freed up before the deadline. |
| thread t([&] { |
| SleepFor(kEvenShorterTimeout); |
| int out; |
| ASSERT_OK(test_queue.BlockingGet(&out)); |
| }); |
| SCOPED_CLEANUP({ |
| t.join(); |
| }); |
| ASSERT_OK(test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout)); |
| |
| // If we shut down, we shouldn't be able to put more elements onto the queue. |
| test_queue.Shutdown(); |
| s = test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout); |
| ASSERT_TRUE(s.IsAborted()) << s.ToString(); |
| } |
| |
| TEST(BlockingQueueTest, TestBlockingGet) { |
| const MonoDelta kShortTimeout = MonoDelta::FromMilliseconds(200); |
| const MonoDelta kEvenShorterTimeout = MonoDelta::FromMilliseconds(100); |
| BlockingQueue<int32_t> test_queue(2); |
| ASSERT_OK(test_queue.BlockingPut(1)); |
| ASSERT_OK(test_queue.BlockingPut(2)); |
| |
| // Test that if we have stuff in our queue, regardless of deadlines, we'll be |
| // able to get them out. |
| int32_t val = 0; |
| ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() - kShortTimeout)); |
| ASSERT_EQ(1, val); |
| ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout)); |
| ASSERT_EQ(2, val); |
| |
| // But without stuff in the queue, we'll time out... |
| Status s = test_queue.BlockingGet(&val, MonoTime::Now() - kShortTimeout); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| s = test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| |
| // ... until new elements show up. |
| thread t([&] { |
| SleepFor(kEvenShorterTimeout); |
| ASSERT_OK(test_queue.BlockingPut(3)); |
| }); |
| SCOPED_CLEANUP({ |
| t.join(); |
| }); |
| ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout)); |
| ASSERT_EQ(3, val); |
| |
| // If we shut down with stuff in our queue, we'll continue to return those |
| // elements. Otherwise, we'll return an error. |
| ASSERT_OK(test_queue.BlockingPut(4)); |
| test_queue.Shutdown(); |
| ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout)); |
| ASSERT_EQ(4, val); |
| s = test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout); |
| ASSERT_TRUE(s.IsAborted()) << s.ToString(); |
| } |
| |
| // Test that, when the queue is shut down with elements still pending, |
| // Drain still returns OK until the elements are all gone. |
| TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) { |
| // Put some elements into the queue and then shut it down. |
| BlockingQueue<int32_t> q(3); |
| ASSERT_EQ(q.Put(1), QUEUE_SUCCESS); |
| ASSERT_EQ(q.Put(2), QUEUE_SUCCESS); |
| |
| q.Shutdown(); |
| |
| // Get() should still return an element. |
| int i; |
| ASSERT_OK(q.BlockingGet(&i)); |
| ASSERT_EQ(1, i); |
| |
| // Drain should still return OK, since it yielded elements. |
| vector<int32_t> out; |
| ASSERT_OK(q.BlockingDrainTo(&out)); |
| ASSERT_EQ(2, out[0]); |
| |
| // Now that it's empty, it should return Aborted. |
| Status s = q.BlockingDrainTo(&out); |
| ASSERT_TRUE(s.IsAborted()) << s.ToString(); |
| s = q.BlockingGet(&i); |
| ASSERT_FALSE(s.ok()) << s.ToString(); |
| } |
| |
| TEST(BlockingQueueTest, TestTooManyInsertions) { |
| BlockingQueue<int32_t> test_queue(2); |
| ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS); |
| ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS); |
| ASSERT_EQ(test_queue.Put(123), QUEUE_FULL); |
| } |
| |
| namespace { |
| |
| struct LengthLogicalSize { |
| static size_t logical_size(const string& s) { |
| return s.length(); |
| } |
| }; |
| |
| } // anonymous namespace |
| |
| TEST(BlockingQueueTest, TestLogicalSize) { |
| BlockingQueue<string, LengthLogicalSize> test_queue(4); |
| ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS); |
| ASSERT_EQ(1, test_queue.size()); |
| ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS); |
| ASSERT_EQ(4, test_queue.size()); |
| ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL); |
| ASSERT_EQ(4, test_queue.size()); |
| } |
| |
| TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) { |
| BlockingQueue<int32_t> test_queue(1); |
| ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS); |
| // No DCHECK failure on destruct. |
| } |
| |
| #ifndef NDEBUG |
| TEST(BlockingQueueDeathTest, TestPointerParamsMustBeEmptyOnDestruct) { |
| ::testing::FLAGS_gtest_death_test_style = "threadsafe"; |
| ASSERT_DEATH({ |
| BlockingQueue<int32_t*> test_queue(1); |
| int32_t element = 123; |
| ASSERT_EQ(test_queue.Put(&element), QUEUE_SUCCESS); |
| // Debug assertion triggered on queue destruction since type is a pointer. |
| }, |
| "BlockingQueue holds bare pointers"); |
| } |
| #endif // NDEBUG |
| |
| TEST(BlockingQueueTest, TestGetFromShutdownQueue) { |
| BlockingQueue<int64_t> test_queue(2); |
| ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS); |
| test_queue.Shutdown(); |
| ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN); |
| int64_t i; |
| ASSERT_OK(test_queue.BlockingGet(&i)); |
| ASSERT_EQ(123, i); |
| Status s = test_queue.BlockingGet(&i); |
| ASSERT_FALSE(s.ok()) << s.ToString(); |
| } |
| |
| TEST(BlockingQueueTest, TestUniquePtrMethods) { |
| BlockingQueue<unique_ptr<int>> test_queue(2); |
| unique_ptr<int> input_int(new int(123)); |
| ASSERT_EQ(QUEUE_SUCCESS, test_queue.Put(std::move(input_int))); |
| unique_ptr<int> output_int; |
| ASSERT_OK(test_queue.BlockingGet(&output_int)); |
| ASSERT_EQ(123, *output_int.get()); |
| test_queue.Shutdown(); |
| } |
| |
| class MultiThreadTest { |
| public: |
| MultiThreadTest() |
| : puts_(4), |
| blocking_puts_(4), |
| nthreads_(5), |
| queue_(nthreads_ * puts_), |
| num_inserters_(nthreads_), |
| sync_latch_(nthreads_) { |
| } |
| |
| void InserterThread(int arg) { |
| for (int i = 0; i < puts_; i++) { |
| ASSERT_EQ(queue_.Put(arg), QUEUE_SUCCESS); |
| } |
| sync_latch_.CountDown(); |
| sync_latch_.Wait(); |
| for (int i = 0; i < blocking_puts_; i++) { |
| ASSERT_OK(queue_.BlockingPut(arg)); |
| } |
| MutexLock guard(lock_); |
| if (--num_inserters_ == 0) { |
| queue_.Shutdown(); |
| } |
| } |
| |
| void RemoverThread() { |
| for (int i = 0; i < puts_ + blocking_puts_; i++) { |
| int32_t arg = 0; |
| Status s = queue_.BlockingGet(&arg); |
| if (!s.ok()) { |
| arg = -1; |
| } |
| MutexLock guard(lock_); |
| gotten_[arg] = gotten_[arg] + 1; |
| } |
| } |
| |
| void Run() { |
| for (int i = 0; i < nthreads_; i++) { |
| threads_.emplace_back(&MultiThreadTest::InserterThread, this, i); |
| threads_.emplace_back(&MultiThreadTest::RemoverThread, this); |
| } |
| // We add an extra thread to ensure that there aren't enough elements in |
| // the queue to go around. This way, we test removal after Shutdown. |
| threads_.emplace_back(&MultiThreadTest::RemoverThread, this); |
| for (auto& thread : threads_) { |
| thread.join(); |
| } |
| // Let's check to make sure we got what we should have. |
| MutexLock guard(lock_); |
| for (int i = 0; i < nthreads_; i++) { |
| ASSERT_EQ(puts_ + blocking_puts_, gotten_[i]); |
| } |
| // And there were nthreads_ * (puts_ + blocking_puts_) |
| // elements removed, but only nthreads_ * puts_ + |
| // blocking_puts_ elements added. So some removers hit the |
| // shutdown case. |
| ASSERT_EQ(puts_ + blocking_puts_, gotten_[-1]); |
| } |
| |
| int puts_; |
| int blocking_puts_; |
| int nthreads_; |
| BlockingQueue<int32_t> queue_; |
| Mutex lock_; |
| std::map<int32_t, int> gotten_; |
| vector<thread> threads_; |
| int num_inserters_; |
| CountDownLatch sync_latch_; |
| }; |
| |
| TEST(BlockingQueueTest, TestMultipleThreads) { |
| MultiThreadTest test; |
| test.Run(); |
| } |
| |
| class BlockingQueueMultiThreadPerfTest : public ::testing::Test { |
| public: |
| void BlockingGetTask(size_t* counter) { |
| barrier_.CountDown(); |
| barrier_.Wait(); |
| |
| uint64_t elem = 0; |
| while (true) { |
| auto s = queue_.BlockingGet(&elem); |
| if (!s.ok()) { |
| CHECK(s.IsAborted()) << s.ToString(); |
| return; |
| } |
| ++(*counter); |
| } |
| } |
| |
| void BlockingPutTask(size_t* counter) { |
| barrier_.CountDown(); |
| barrier_.Wait(); |
| |
| uint64_t elem = 0; |
| while (true) { |
| auto s = queue_.BlockingPut(elem++); |
| if (!s.ok()) { |
| CHECK(s.IsAborted()) << s.ToString(); |
| return; |
| } |
| ++(*counter); |
| } |
| } |
| |
| void NonBlockingPutTask(size_t* counter) { |
| barrier_.CountDown(); |
| barrier_.Wait(); |
| |
| uint64_t elem = 0; |
| while (true) { |
| auto result = queue_.Put(elem++); |
| if (result == QUEUE_SHUTDOWN) { |
| return; |
| } |
| switch (result) { |
| case QUEUE_SUCCESS: |
| ++(*counter); |
| continue; |
| case QUEUE_FULL: |
| SleepFor(MonoDelta::FromMicroseconds(25)); |
| continue; |
| default: |
| LOG(FATAL) << "unexpected queue status: " << result; |
| } |
| } |
| } |
| |
| protected: |
| BlockingQueueMultiThreadPerfTest() |
| : num_blocking_writers_(FLAGS_num_blocking_writers), |
| num_non_blocking_writers_(FLAGS_num_non_blocking_writers), |
| num_blocking_readers_(FLAGS_num_blocking_readers), |
| runtime_(MonoDelta::FromSeconds(FLAGS_runtime_sec)), |
| queue_(FLAGS_queue_capacity), |
| barrier_(num_blocking_writers_ + |
| num_non_blocking_writers_ + |
| num_blocking_readers_) { |
| } |
| |
| const size_t num_blocking_writers_; |
| const size_t num_non_blocking_writers_; |
| const size_t num_blocking_readers_; |
| const MonoDelta runtime_; |
| BlockingQueue<uint64_t> queue_; |
| vector<thread> threads_; |
| CountDownLatch barrier_; |
| }; |
| |
| // This is a test scenario to assess the performance of BlockingQueue in the |
| // terms of call rates when multiple concurrent writers and readers are present. |
| TEST_F(BlockingQueueMultiThreadPerfTest, RequestRates) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| vector<size_t> blocking_read_counts(num_blocking_readers_, 0); |
| for (size_t i = 0; i < num_blocking_readers_; ++i) { |
| threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::BlockingGetTask, |
| this, &blocking_read_counts[i]); |
| } |
| |
| vector<size_t> blocking_write_counts(num_blocking_writers_, 0); |
| for (size_t i = 0; i < num_blocking_writers_; ++i) { |
| threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::BlockingPutTask, |
| this, &blocking_write_counts[i]); |
| } |
| |
| vector<size_t> non_blocking_write_counts(num_non_blocking_writers_, 0); |
| for (size_t i = 0; i < num_non_blocking_writers_; ++i) { |
| threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::NonBlockingPutTask, |
| this, &non_blocking_write_counts[i]); |
| } |
| |
| SleepFor(runtime_); |
| queue_.Shutdown(); |
| |
| for_each(threads_.begin(), threads_.end(), [](thread& t) { t.join(); }); |
| |
| const auto blocking_reads_num = accumulate( |
| blocking_read_counts.begin(), blocking_read_counts.end(), 0UL); |
| const auto blocking_writes_num = accumulate( |
| blocking_write_counts.begin(), blocking_write_counts.end(), 0UL); |
| const auto non_blocking_writes_num = accumulate( |
| non_blocking_write_counts.begin(), non_blocking_write_counts.end(), 0UL); |
| |
| LOG(INFO) << "number of successful BlockingGet() calls: " |
| << blocking_reads_num; |
| LOG(INFO) << "number of successful BlockingPut() calls: " |
| << blocking_writes_num; |
| LOG(INFO) << "number of successful Put() calls: " |
| << non_blocking_writes_num; |
| |
| LOG(INFO) << Substitute( |
| "BlockingGet() rate: $0 calls/sec", |
| static_cast<double>(blocking_reads_num) / runtime_.ToSeconds()); |
| LOG(INFO) << Substitute( |
| "BlockingPut() rate: $0 calls/sec", |
| static_cast<double>(blocking_writes_num) / runtime_.ToSeconds()); |
| LOG(INFO) << Substitute( |
| "Put() (non-blocking) rate: $0 calls/sec", |
| static_cast<double>(non_blocking_writes_num) / runtime_.ToSeconds()); |
| LOG(INFO) << Substitute( |
| "total Blocking{Get,Put}() rate: $0 calls/sec", |
| static_cast<double>(blocking_reads_num + blocking_writes_num) / runtime_.ToSeconds()); |
| LOG(INFO) << Substitute( |
| "total rate: $0 calls/sec", |
| static_cast<double>(blocking_reads_num + |
| blocking_writes_num + |
| non_blocking_writes_num) / runtime_.ToSeconds()); |
| } |
| |
| } // namespace kudu |