| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <boost/bind.hpp> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| #include <memory> |
| |
| #include "kudu/gutil/atomicops.h" |
| #include "kudu/gutil/bind.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/promise.h" |
| #include "kudu/util/threadpool.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/trace.h" |
| |
| using std::shared_ptr; |
| |
| namespace kudu { |
| |
| namespace { |
| static Status BuildMinMaxTestPool(int min_threads, int max_threads, gscoped_ptr<ThreadPool>* pool) { |
| return ThreadPoolBuilder("test").set_min_threads(min_threads) |
| .set_max_threads(max_threads) |
| .Build(pool); |
| } |
| } // anonymous namespace |
| |
| TEST(TestThreadPool, TestNoTaskOpenClose) { |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(BuildMinMaxTestPool(4, 4, &thread_pool)); |
| thread_pool->Shutdown(); |
| } |
| |
| static void SimpleTaskMethod(int n, Atomic32 *counter) { |
| while (n--) { |
| base::subtle::NoBarrier_AtomicIncrement(counter, 1); |
| boost::detail::yield(n); |
| } |
| } |
| |
| class SimpleTask : public Runnable { |
| public: |
| SimpleTask(int n, Atomic32 *counter) |
| : n_(n), counter_(counter) { |
| } |
| |
| void Run() OVERRIDE { |
| SimpleTaskMethod(n_, counter_); |
| } |
| |
| private: |
| int n_; |
| Atomic32 *counter_; |
| }; |
| |
| TEST(TestThreadPool, TestSimpleTasks) { |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(BuildMinMaxTestPool(4, 4, &thread_pool)); |
| |
| Atomic32 counter(0); |
| std::shared_ptr<Runnable> task(new SimpleTask(15, &counter)); |
| |
| ASSERT_OK(thread_pool->SubmitFunc(boost::bind(&SimpleTaskMethod, 10, &counter))); |
| ASSERT_OK(thread_pool->Submit(task)); |
| ASSERT_OK(thread_pool->SubmitFunc(boost::bind(&SimpleTaskMethod, 20, &counter))); |
| ASSERT_OK(thread_pool->Submit(task)); |
| ASSERT_OK(thread_pool->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter))); |
| thread_pool->Wait(); |
| ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter)); |
| thread_pool->Shutdown(); |
| } |
| |
| static void IssueTraceStatement() { |
| TRACE("hello from task"); |
| } |
| |
| // Test that the thread-local trace is propagated to tasks |
| // submitted to the threadpool. |
| TEST(TestThreadPool, TestTracePropagation) { |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool)); |
| |
| scoped_refptr<Trace> t(new Trace); |
| { |
| ADOPT_TRACE(t.get()); |
| ASSERT_OK(thread_pool->SubmitFunc(&IssueTraceStatement)); |
| } |
| thread_pool->Wait(); |
| ASSERT_STR_CONTAINS(t->DumpToString(true), "hello from task"); |
| } |
| |
| TEST(TestThreadPool, TestSubmitAfterShutdown) { |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool)); |
| thread_pool->Shutdown(); |
| Status s = thread_pool->SubmitFunc(&IssueTraceStatement); |
| ASSERT_EQ("Service unavailable: The pool has been shut down.", |
| s.ToString()); |
| } |
| |
| class SlowTask : public Runnable { |
| public: |
| explicit SlowTask(CountDownLatch* latch) |
| : latch_(latch) { |
| } |
| |
| void Run() OVERRIDE { |
| latch_->Wait(); |
| } |
| |
| private: |
| CountDownLatch* latch_; |
| }; |
| |
| TEST(TestThreadPool, TestThreadPoolWithNoMinimum) { |
| MonoDelta idle_timeout = MonoDelta::FromMilliseconds(1); |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(ThreadPoolBuilder("test") |
| .set_min_threads(0).set_max_threads(3) |
| .set_idle_timeout(idle_timeout).Build(&thread_pool)); |
| // There are no threads to start with. |
| ASSERT_TRUE(thread_pool->num_threads_ == 0); |
| // We get up to 3 threads when submitting work. |
| CountDownLatch latch(1); |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_EQ(2, thread_pool->num_threads_); |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_EQ(3, thread_pool->num_threads_); |
| // The 4th piece of work gets queued. |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_EQ(3, thread_pool->num_threads_); |
| // Finish all work |
| latch.CountDown(); |
| thread_pool->Wait(); |
| ASSERT_EQ(0, thread_pool->active_threads_); |
| thread_pool->Shutdown(); |
| ASSERT_EQ(0, thread_pool->num_threads_); |
| } |
| |
| // Regression test for a bug where a task is submitted exactly |
| // as a thread is about to exit. Previously this could hang forever. |
| TEST(TestThreadPool, TestRace) { |
| alarm(10); |
| MonoDelta idle_timeout = MonoDelta::FromMicroseconds(1); |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(ThreadPoolBuilder("test") |
| .set_min_threads(0).set_max_threads(1) |
| .set_idle_timeout(idle_timeout).Build(&thread_pool)); |
| |
| for (int i = 0; i < 500; i++) { |
| CountDownLatch l(1); |
| ASSERT_OK(thread_pool->SubmitFunc(boost::bind(&CountDownLatch::CountDown, &l))); |
| l.Wait(); |
| // Sleeping a different amount in each iteration makes it more likely to hit |
| // the bug. |
| SleepFor(MonoDelta::FromMicroseconds(i)); |
| } |
| alarm(0); |
| } |
| |
| TEST(TestThreadPool, TestVariableSizeThreadPool) { |
| MonoDelta idle_timeout = MonoDelta::FromMilliseconds(1); |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(ThreadPoolBuilder("test") |
| .set_min_threads(1).set_max_threads(4) |
| .set_idle_timeout(idle_timeout).Build(&thread_pool)); |
| // There is 1 thread to start with. |
| ASSERT_EQ(1, thread_pool->num_threads_); |
| // We get up to 4 threads when submitting work. |
| CountDownLatch latch(1); |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_EQ(1, thread_pool->num_threads_); |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_EQ(2, thread_pool->num_threads_); |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_EQ(3, thread_pool->num_threads_); |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_EQ(4, thread_pool->num_threads_); |
| // The 5th piece of work gets queued. |
| ASSERT_OK(thread_pool->Submit( |
| shared_ptr<Runnable>(new SlowTask(&latch)))); |
| ASSERT_EQ(4, thread_pool->num_threads_); |
| // Finish all work |
| latch.CountDown(); |
| thread_pool->Wait(); |
| ASSERT_EQ(0, thread_pool->active_threads_); |
| thread_pool->Shutdown(); |
| ASSERT_EQ(0, thread_pool->num_threads_); |
| } |
| |
| TEST(TestThreadPool, TestMaxQueueSize) { |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(ThreadPoolBuilder("test") |
| .set_min_threads(1).set_max_threads(1) |
| .set_max_queue_size(1).Build(&thread_pool)); |
| |
| CountDownLatch latch(1); |
| ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); |
| Status s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))); |
| // We race against the worker thread to re-enqueue. |
| // If we get there first, we fail on the 2nd Submit(). |
| // If the worker dequeues first, we fail on the 3rd. |
| if (s.ok()) { |
| s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))); |
| } |
| CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString(); |
| latch.CountDown(); |
| thread_pool->Wait(); |
| thread_pool->Shutdown(); |
| } |
| |
| // Test that setting a promise from another thread yields |
| // a value on the current thread. |
| TEST(TestThreadPool, TestPromises) { |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(ThreadPoolBuilder("test") |
| .set_min_threads(1).set_max_threads(1) |
| .set_max_queue_size(1).Build(&thread_pool)); |
| |
| Promise<int> my_promise; |
| ASSERT_OK(thread_pool->SubmitClosure( |
| Bind(&Promise<int>::Set, Unretained(&my_promise), 5))); |
| ASSERT_EQ(5, my_promise.Get()); |
| thread_pool->Shutdown(); |
| } |
| |
| |
| METRIC_DEFINE_entity(test_entity); |
| METRIC_DEFINE_histogram(test_entity, queue_length, "queue length", |
| MetricUnit::kTasks, "queue length", 1000, 1); |
| |
| METRIC_DEFINE_histogram(test_entity, queue_time, "queue time", |
| MetricUnit::kMicroseconds, "queue time", 1000000, 1); |
| |
| METRIC_DEFINE_histogram(test_entity, run_time, "run time", |
| MetricUnit::kMicroseconds, "run time", 1000, 1); |
| |
| TEST(TestThreadPool, TestMetrics) { |
| MetricRegistry registry; |
| scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate( |
| ®istry, "test entity"); |
| |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(ThreadPoolBuilder("test") |
| .set_min_threads(1).set_max_threads(1) |
| .Build(&thread_pool)); |
| |
| // Enable metrics for the thread pool. |
| scoped_refptr<Histogram> queue_length = METRIC_queue_length.Instantiate(entity); |
| scoped_refptr<Histogram> queue_time = METRIC_queue_time.Instantiate(entity); |
| scoped_refptr<Histogram> run_time = METRIC_run_time.Instantiate(entity); |
| thread_pool->SetQueueLengthHistogram(queue_length); |
| thread_pool->SetQueueTimeMicrosHistogram(queue_time); |
| thread_pool->SetRunTimeMicrosHistogram(run_time); |
| |
| int kNumItems = 500; |
| for (int i = 0; i < kNumItems; i++) { |
| ASSERT_OK(thread_pool->SubmitFunc(boost::bind(&usleep, i))); |
| } |
| |
| thread_pool->Wait(); |
| |
| // Check that all histograms were incremented once per submitted item. |
| ASSERT_EQ(kNumItems, queue_length->TotalCount()); |
| ASSERT_EQ(kNumItems, queue_time->TotalCount()); |
| ASSERT_EQ(kNumItems, run_time->TotalCount()); |
| } |
| |
| // Test that a thread pool will crash if asked to run its own blocking |
| // functions in a pool thread. |
| // |
| // In a multi-threaded application, TSAN is unsafe to use following a fork(). |
| // After a fork(), TSAN will: |
| // 1. Disable verification, expecting an exec() soon anyway, and |
| // 2. Die on future thread creation. |
| // For some reason, this test triggers behavior #2. We could disable it with |
| // the TSAN option die_after_fork=0, but this can (supposedly) lead to |
| // deadlocks, so we'll disable the entire test instead. |
| #ifndef THREAD_SANITIZER |
| TEST(TestThreadPool, TestDeadlocks) { |
| const char* death_msg = "called pool function that would result in deadlock"; |
| ASSERT_DEATH({ |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(ThreadPoolBuilder("test") |
| .set_min_threads(1) |
| .set_max_threads(1) |
| .Build(&thread_pool)); |
| ASSERT_OK(thread_pool->SubmitClosure( |
| Bind(&ThreadPool::Shutdown, Unretained(thread_pool.get())))); |
| thread_pool->Wait(); |
| }, death_msg); |
| |
| ASSERT_DEATH({ |
| gscoped_ptr<ThreadPool> thread_pool; |
| ASSERT_OK(ThreadPoolBuilder("test") |
| .set_min_threads(1) |
| .set_max_threads(1) |
| .Build(&thread_pool)); |
| ASSERT_OK(thread_pool->SubmitClosure( |
| Bind(&ThreadPool::Wait, Unretained(thread_pool.get())))); |
| thread_pool->Wait(); |
| }, death_msg); |
| } |
| #endif |
| |
| } // namespace kudu |