blob: 3633ae9be0eaa71982a4e39792c085473337e307 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <glog/logging.h>
#include <unistd.h>
#include "common/logging.h"
#include "testutil/gtest-util.h"
#include "util/thread-pool.h"
#include "common/names.h"
namespace impala {
const int NUM_THREADS = 5;
int thread_counters[NUM_THREADS];
// Per-thread mutex to ensure visibility of counters after thread pool terminates
mutex thread_mutexes[NUM_THREADS];
void Count(int thread_id, const int& i) {
lock_guard<mutex> l(thread_mutexes[thread_id]);
thread_counters[thread_id] += i;
}
TEST(ThreadPoolTest, BasicTest) {
const int OFFERED_RANGE = 10000;
for (int i = 0; i < NUM_THREADS; ++i) {
thread_counters[i] = 0;
}
ThreadPool<int> thread_pool("thread-pool", "worker", 5, 250, Count);
ASSERT_OK(thread_pool.Init());
for (int i = 0; i <= OFFERED_RANGE; ++i) {
ASSERT_TRUE(thread_pool.Offer(i));
}
thread_pool.DrainAndShutdown();
// Check that Offer() after Shutdown() will return false
ASSERT_FALSE(thread_pool.Offer(-1));
EXPECT_EQ(0, thread_pool.GetQueueSize());
int expected_count = (OFFERED_RANGE * (OFFERED_RANGE + 1)) / 2;
int count = 0;
for (int i = 0; i < NUM_THREADS; ++i) {
lock_guard<mutex> l(thread_mutexes[i]);
LOG(INFO) << "Counter " << i << ": " << thread_counters[i];
count += thread_counters[i];
}
EXPECT_EQ(expected_count, count);
}
class SleepWorkItem : public SynchronousWorkItem {
public:
SleepWorkItem(int64_t timeout_ms, bool* destructor_called)
: timeout_ms_(timeout_ms), destructor_called_ptr_(destructor_called) {
*destructor_called_ptr_ = false;
}
~SleepWorkItem() {
*destructor_called_ptr_ = true;
}
virtual Status Execute() override {
if (timeout_ms_ > 0) SleepForMs(timeout_ms_);
message_ = "Done";
return Status::OK();
}
virtual std::string GetDescription() override {
return Substitute("Simple task with $0 millisecond timeout", timeout_ms_);
}
std::string GetMessage() { return message_; }
private:
std::string message_ = "Not done";
int64_t timeout_ms_;
bool* destructor_called_ptr_;
};
TEST(ThreadPoolTest, SynchronousThreadPoolNoSleep) {
// Create a synchronous pool with one thread and a queue size of one.
SynchronousThreadPool pool("sync-thread-pool", "worker", 1, 1);
ASSERT_OK(pool.Init());
// Base case: work item takes no time, run it with a timeout of 5 milliseconds
unique_ptr<bool> no_sleep_destroyed(new bool);
std::shared_ptr<SleepWorkItem> no_sleep(new SleepWorkItem(0, no_sleep_destroyed.get()));
ASSERT_OK(pool.SynchronousOffer(no_sleep, 5));
ASSERT_EQ(no_sleep->GetMessage(), "Done");
// If the SynchronousOffer() completed successfully, the thread pool does not have any
// shared_ptr to the work item. The caller is the only holder, so when it calls
// reset, the destructor must be called.
no_sleep.reset();
// The work item should be destroyed even if we are not shutting down the pool.
// IMPALA-8371: There is a race condition with the worker thread, as the worker thread
// may not have released its shared_ptr to the work item. Wait for a limited period of
// time for the work thread to release the shared_ptr.
for (int i = 0; i < 10; i++) {
if (*no_sleep_destroyed) break;
SleepForMs(5);
}
ASSERT_TRUE(*no_sleep_destroyed);
pool.DrainAndShutdown();
}
TEST(ThreadPoolTest, SynchronousThreadPoolTimeouts) {
// Create a synchronous pool with one thread and a queue size of one.
SynchronousThreadPool pool("sync-thread-pool", "worker", 1, 1);
ASSERT_OK(pool.Init());
// Timeout case #1: Submit one task that takes 100 milliseconds. Offer it with a timeout
// of 1 millisecond so that the caller immediately times out.
unique_ptr<bool> long_sleep_destroyed(new bool);
std::shared_ptr<SleepWorkItem> long_sleep(
new SleepWorkItem(100, long_sleep_destroyed.get()));
Status timeout_status = pool.SynchronousOffer(long_sleep, 1);
ASSERT_EQ(timeout_status.code(), TErrorCode::THREAD_POOL_TASK_TIMED_OUT);
// The work item is still running, and even if the caller releases its shared_ptr,
// the work item is not destroyed.
long_sleep.reset();
ASSERT_FALSE(*long_sleep_destroyed);
// The single thread in the thread pool is still running. Submit another task
// that will queue. The task doesn't matter.
unique_ptr<bool> queued_task_destroyed(new bool);
std::shared_ptr<SleepWorkItem> queued_task(
new SleepWorkItem(0, queued_task_destroyed.get()));
Status queued_task_status = pool.SynchronousOffer(queued_task, 1);
ASSERT_EQ(queued_task_status.code(), TErrorCode::THREAD_POOL_TASK_TIMED_OUT);
// The work item is queued, and even if the caller releases its shared_ptr, the work
// item is not destroyed.
queued_task.reset();
ASSERT_FALSE(*queued_task_destroyed);
// Now, the queue is full. Any new task will fail to submit.
unique_ptr<bool> fail_to_submit_destroyed(new bool);
std::shared_ptr<SleepWorkItem> fail_to_submit(
new SleepWorkItem(0, fail_to_submit_destroyed.get()));
Status fail_to_submit_status = pool.SynchronousOffer(fail_to_submit, 1);
ASSERT_EQ(fail_to_submit_status.code(), TErrorCode::THREAD_POOL_SUBMIT_FAILED);
// When the submit fails, the thread pool does not keep any shared_ptr to the work
// item. When the caller releases its shared_ptr, the work item is immediately
// destroyed.
fail_to_submit.reset();
ASSERT_TRUE(*fail_to_submit_destroyed);
// The tasks will still complete
pool.DrainAndShutdown();
// The work items that the thread pool had running and in the queue are destroyed
// when they complete (even though the caller long since released its shared_ptr).
ASSERT_TRUE(*long_sleep_destroyed);
ASSERT_TRUE(*queued_task_destroyed);
}
}