blob: 9926ac1a7a4d243f5cf121eca930fad21f972b00 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef _WIN32
#include <sys/wait.h>
#include <unistd.h>
#endif
#include <algorithm>
#include <cstdio>
#include <cstdlib>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <gtest/gtest.h>
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/macros.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
namespace internal {
template <typename T>
static void task_add(T x, T y, T* out) {
*out = x + y;
}
template <typename T>
struct task_slow_add {
void operator()(T x, T y, T* out) {
SleepFor(seconds_);
*out = x + y;
}
const double seconds_;
};
typedef std::function<void(int, int, int*)> AddTaskFunc;
template <typename T>
static T add(T x, T y) {
return x + y;
}
template <typename T>
static T slow_add(double seconds, T x, T y) {
SleepFor(seconds);
return x + y;
}
template <typename T>
static T inplace_add(T& x, T y) {
return x += y;
}
// A class to spawn "add" tasks to a pool and check the results when done
class AddTester {
public:
explicit AddTester(int nadds, StopToken stop_token = StopToken::Unstoppable())
: nadds_(nadds), stop_token_(stop_token), xs_(nadds), ys_(nadds), outs_(nadds, -1) {
int x = 0, y = 0;
std::generate(xs_.begin(), xs_.end(), [&] {
++x;
return x;
});
std::generate(ys_.begin(), ys_.end(), [&] {
y += 10;
return y;
});
}
AddTester(AddTester&&) = default;
void SpawnTasks(ThreadPool* pool, AddTaskFunc add_func) {
for (int i = 0; i < nadds_; ++i) {
ASSERT_OK(pool->Spawn([=] { add_func(xs_[i], ys_[i], &outs_[i]); }, stop_token_));
}
}
void CheckResults() {
for (int i = 0; i < nadds_; ++i) {
ASSERT_EQ(outs_[i], (i + 1) * 11);
}
}
void CheckNotAllComputed() {
for (int i = 0; i < nadds_; ++i) {
if (outs_[i] == -1) {
return;
}
}
ASSERT_TRUE(0) << "all values were computed";
}
private:
ARROW_DISALLOW_COPY_AND_ASSIGN(AddTester);
int nadds_;
StopToken stop_token_;
std::vector<int> xs_;
std::vector<int> ys_;
std::vector<int> outs_;
};
class TestRunSynchronously : public testing::TestWithParam<bool> {
public:
bool UseThreads() { return GetParam(); }
template <typename T>
Result<T> Run(FnOnce<Future<T>(Executor*)> top_level_task) {
return RunSynchronously(std::move(top_level_task), UseThreads());
}
Status RunVoid(FnOnce<Future<>(Executor*)> top_level_task) {
return RunSynchronouslyVoid(std::move(top_level_task), UseThreads());
}
void TestContinueAfterExternal(bool transfer_to_main_thread) {
bool continuation_ran = false;
EXPECT_OK_AND_ASSIGN(auto external_pool, ThreadPool::Make(1));
auto top_level_task = [&](Executor* executor) {
struct Callback {
Status operator()(...) {
*continuation_ran = true;
return Status::OK();
}
bool* continuation_ran;
};
auto fut = DeferNotOk(external_pool->Submit([&] {
SleepABit();
return Status::OK();
}));
if (transfer_to_main_thread) {
fut = executor->Transfer(fut);
}
return fut.Then(Callback{&continuation_ran});
};
ASSERT_OK(RunVoid(std::move(top_level_task)));
EXPECT_TRUE(continuation_ran);
}
};
TEST_P(TestRunSynchronously, SimpleRun) {
bool task_ran = false;
auto task = [&](Executor* executor) {
EXPECT_NE(executor, nullptr);
task_ran = true;
return Future<>::MakeFinished(Status::OK());
};
ASSERT_OK(RunVoid(std::move(task)));
EXPECT_TRUE(task_ran);
}
TEST_P(TestRunSynchronously, SpawnNested) {
bool nested_ran = false;
auto top_level_task = [&](Executor* executor) {
return DeferNotOk(executor->Submit([&] {
nested_ran = true;
return Status::OK();
}));
};
ASSERT_OK(RunVoid(std::move(top_level_task)));
EXPECT_TRUE(nested_ran);
}
TEST_P(TestRunSynchronously, SpawnMoreNested) {
std::atomic<int> nested_ran{0};
auto top_level_task = [&](Executor* executor) -> Future<> {
auto fut_a = DeferNotOk(executor->Submit([&] { nested_ran++; }));
auto fut_b = DeferNotOk(executor->Submit([&] { nested_ran++; }));
return AllComplete({fut_a, fut_b})
.Then([&](const Result<arrow::detail::Empty>& result) {
nested_ran++;
return result;
});
};
ASSERT_OK(RunVoid(std::move(top_level_task)));
EXPECT_EQ(nested_ran, 3);
}
TEST_P(TestRunSynchronously, WithResult) {
auto top_level_task = [&](Executor* executor) {
return DeferNotOk(executor->Submit([] { return 42; }));
};
ASSERT_OK_AND_EQ(42, Run<int>(std::move(top_level_task)));
}
TEST_P(TestRunSynchronously, StopTokenSpawn) {
bool nested_ran = false;
StopSource stop_source;
auto top_level_task = [&](Executor* executor) -> Future<> {
stop_source.RequestStop(Status::Invalid("XYZ"));
RETURN_NOT_OK(executor->Spawn([&] { nested_ran = true; }, stop_source.token()));
return Future<>::MakeFinished();
};
ASSERT_OK(RunVoid(std::move(top_level_task)));
EXPECT_FALSE(nested_ran);
}
TEST_P(TestRunSynchronously, StopTokenSubmit) {
bool nested_ran = false;
StopSource stop_source;
auto top_level_task = [&](Executor* executor) -> Future<> {
stop_source.RequestStop();
return DeferNotOk(executor->Submit(stop_source.token(), [&] {
nested_ran = true;
return Status::OK();
}));
};
ASSERT_RAISES(Cancelled, RunVoid(std::move(top_level_task)));
EXPECT_FALSE(nested_ran);
}
TEST_P(TestRunSynchronously, ContinueAfterExternal) {
// The future returned by the top-level task completes on another thread.
// This can trigger delicate race conditions in the SerialExecutor code,
// especially destruction.
this->TestContinueAfterExternal(/*transfer_to_main_thread=*/false);
}
TEST_P(TestRunSynchronously, ContinueAfterExternalTransferred) {
// Like above, but the future is transferred back to the serial executor
// after completion on an external thread.
this->TestContinueAfterExternal(/*transfer_to_main_thread=*/true);
}
TEST_P(TestRunSynchronously, SchedulerAbort) {
auto top_level_task = [&](Executor* executor) { return Status::Invalid("XYZ"); };
ASSERT_RAISES(Invalid, RunVoid(std::move(top_level_task)));
}
TEST_P(TestRunSynchronously, PropagatedError) {
auto top_level_task = [&](Executor* executor) {
return DeferNotOk(executor->Submit([] { return Status::Invalid("XYZ"); }));
};
ASSERT_RAISES(Invalid, RunVoid(std::move(top_level_task)));
}
INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
::testing::Values(false, true));
class TestThreadPool : public ::testing::Test {
public:
void TearDown() override {
fflush(stdout);
fflush(stderr);
}
std::shared_ptr<ThreadPool> MakeThreadPool() { return MakeThreadPool(4); }
std::shared_ptr<ThreadPool> MakeThreadPool(int threads) {
return *ThreadPool::Make(threads);
}
void DoSpawnAdds(ThreadPool* pool, int nadds, AddTaskFunc add_func,
StopToken stop_token = StopToken::Unstoppable(),
StopSource* stop_source = nullptr) {
AddTester add_tester(nadds, stop_token);
add_tester.SpawnTasks(pool, add_func);
if (stop_source) {
stop_source->RequestStop();
}
ASSERT_OK(pool->Shutdown());
if (stop_source) {
add_tester.CheckNotAllComputed();
} else {
add_tester.CheckResults();
}
}
void SpawnAdds(ThreadPool* pool, int nadds, AddTaskFunc add_func,
StopToken stop_token = StopToken::Unstoppable()) {
DoSpawnAdds(pool, nadds, std::move(add_func), std::move(stop_token));
}
void SpawnAddsAndCancel(ThreadPool* pool, int nadds, AddTaskFunc add_func,
StopSource* stop_source) {
DoSpawnAdds(pool, nadds, std::move(add_func), stop_source->token(), stop_source);
}
void DoSpawnAddsThreaded(ThreadPool* pool, int nthreads, int nadds,
AddTaskFunc add_func,
StopToken stop_token = StopToken::Unstoppable(),
StopSource* stop_source = nullptr) {
// Same as SpawnAdds, but do the task spawning from multiple threads
std::vector<AddTester> add_testers;
std::vector<std::thread> threads;
for (int i = 0; i < nthreads; ++i) {
add_testers.emplace_back(nadds, stop_token);
}
for (auto& add_tester : add_testers) {
threads.emplace_back([&] { add_tester.SpawnTasks(pool, add_func); });
}
if (stop_source) {
stop_source->RequestStop();
}
for (auto& thread : threads) {
thread.join();
}
ASSERT_OK(pool->Shutdown());
for (auto& add_tester : add_testers) {
if (stop_source) {
add_tester.CheckNotAllComputed();
} else {
add_tester.CheckResults();
}
}
}
void SpawnAddsThreaded(ThreadPool* pool, int nthreads, int nadds, AddTaskFunc add_func,
StopToken stop_token = StopToken::Unstoppable()) {
DoSpawnAddsThreaded(pool, nthreads, nadds, std::move(add_func),
std::move(stop_token));
}
void SpawnAddsThreadedAndCancel(ThreadPool* pool, int nthreads, int nadds,
AddTaskFunc add_func, StopSource* stop_source) {
DoSpawnAddsThreaded(pool, nthreads, nadds, std::move(add_func), stop_source->token(),
stop_source);
}
};
TEST_F(TestThreadPool, ConstructDestruct) {
// Stress shutdown-at-destruction logic
for (int threads : {1, 2, 3, 8, 32, 70}) {
auto pool = this->MakeThreadPool(threads);
}
}
// Correctness and stress tests using Spawn() and Shutdown()
TEST_F(TestThreadPool, Spawn) {
auto pool = this->MakeThreadPool(3);
SpawnAdds(pool.get(), 7, task_add<int>);
}
TEST_F(TestThreadPool, StressSpawn) {
auto pool = this->MakeThreadPool(30);
SpawnAdds(pool.get(), 1000, task_add<int>);
}
TEST_F(TestThreadPool, StressSpawnThreaded) {
auto pool = this->MakeThreadPool(30);
SpawnAddsThreaded(pool.get(), 20, 100, task_add<int>);
}
TEST_F(TestThreadPool, SpawnSlow) {
// This checks that Shutdown() waits for all tasks to finish
auto pool = this->MakeThreadPool(2);
SpawnAdds(pool.get(), 7, task_slow_add<int>{/*seconds=*/0.02});
}
TEST_F(TestThreadPool, StressSpawnSlow) {
auto pool = this->MakeThreadPool(30);
SpawnAdds(pool.get(), 1000, task_slow_add<int>{/*seconds=*/0.002});
}
TEST_F(TestThreadPool, StressSpawnSlowThreaded) {
auto pool = this->MakeThreadPool(30);
SpawnAddsThreaded(pool.get(), 20, 100, task_slow_add<int>{/*seconds=*/0.002});
}
TEST_F(TestThreadPool, SpawnWithStopToken) {
StopSource stop_source;
auto pool = this->MakeThreadPool(3);
SpawnAdds(pool.get(), 7, task_add<int>, stop_source.token());
}
TEST_F(TestThreadPool, StressSpawnThreadedWithStopToken) {
StopSource stop_source;
auto pool = this->MakeThreadPool(30);
SpawnAddsThreaded(pool.get(), 20, 100, task_add<int>, stop_source.token());
}
TEST_F(TestThreadPool, SpawnWithStopTokenCancelled) {
StopSource stop_source;
auto pool = this->MakeThreadPool(3);
SpawnAddsAndCancel(pool.get(), 100, task_slow_add<int>{/*seconds=*/0.02}, &stop_source);
}
TEST_F(TestThreadPool, StressSpawnThreadedWithStopTokenCancelled) {
StopSource stop_source;
auto pool = this->MakeThreadPool(30);
SpawnAddsThreadedAndCancel(pool.get(), 20, 100, task_slow_add<int>{/*seconds=*/0.02},
&stop_source);
}
TEST_F(TestThreadPool, QuickShutdown) {
AddTester add_tester(100);
{
auto pool = this->MakeThreadPool(3);
add_tester.SpawnTasks(pool.get(), task_slow_add<int>{/*seconds=*/0.02});
ASSERT_OK(pool->Shutdown(false /* wait */));
add_tester.CheckNotAllComputed();
}
add_tester.CheckNotAllComputed();
}
TEST_F(TestThreadPool, SetCapacity) {
auto pool = this->MakeThreadPool(5);
// Thread spawning is on-demand
ASSERT_EQ(pool->GetCapacity(), 5);
ASSERT_EQ(pool->GetActualCapacity(), 0);
ASSERT_OK(pool->SetCapacity(3));
ASSERT_EQ(pool->GetCapacity(), 3);
ASSERT_EQ(pool->GetActualCapacity(), 0);
auto gating_task = GatingTask::Make();
ASSERT_OK(pool->Spawn(gating_task->Task()));
ASSERT_OK(gating_task->WaitForRunning(1));
ASSERT_EQ(pool->GetActualCapacity(), 1);
ASSERT_OK(gating_task->Unlock());
gating_task = GatingTask::Make();
// Spawn more tasks than the pool capacity
for (int i = 0; i < 6; ++i) {
ASSERT_OK(pool->Spawn(gating_task->Task()));
}
ASSERT_OK(gating_task->WaitForRunning(3));
SleepFor(0.001); // Sleep a bit just to make sure it isn't making any threads
ASSERT_EQ(pool->GetActualCapacity(), 3); // maxxed out
// The tasks have not finished yet, increasing the desired capacity
// should spawn threads immediately.
ASSERT_OK(pool->SetCapacity(5));
ASSERT_EQ(pool->GetCapacity(), 5);
ASSERT_EQ(pool->GetActualCapacity(), 5);
// Thread reaping is eager (but asynchronous)
ASSERT_OK(pool->SetCapacity(2));
ASSERT_EQ(pool->GetCapacity(), 2);
// Wait for workers to wake up and secede
ASSERT_OK(gating_task->Unlock());
BusyWait(0.5, [&] { return pool->GetActualCapacity() == 2; });
ASSERT_EQ(pool->GetActualCapacity(), 2);
// Downsize while tasks are pending
ASSERT_OK(pool->SetCapacity(5));
ASSERT_EQ(pool->GetCapacity(), 5);
gating_task = GatingTask::Make();
for (int i = 0; i < 10; ++i) {
ASSERT_OK(pool->Spawn(gating_task->Task()));
}
ASSERT_OK(gating_task->WaitForRunning(5));
ASSERT_EQ(pool->GetActualCapacity(), 5);
ASSERT_OK(pool->SetCapacity(2));
ASSERT_EQ(pool->GetCapacity(), 2);
ASSERT_OK(gating_task->Unlock());
BusyWait(0.5, [&] { return pool->GetActualCapacity() == 2; });
ASSERT_EQ(pool->GetActualCapacity(), 2);
// Ensure nothing got stuck
ASSERT_OK(pool->Shutdown());
}
// Test Submit() functionality
TEST_F(TestThreadPool, Submit) {
auto pool = this->MakeThreadPool(3);
{
ASSERT_OK_AND_ASSIGN(Future<int> fut, pool->Submit(add<int>, 4, 5));
Result<int> res = fut.result();
ASSERT_OK_AND_EQ(9, res);
}
{
ASSERT_OK_AND_ASSIGN(Future<std::string> fut,
pool->Submit(add<std::string>, "foo", "bar"));
ASSERT_OK_AND_EQ("foobar", fut.result());
}
{
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(slow_add<int>, /*seconds=*/0.01, 4, 5));
ASSERT_OK_AND_EQ(9, fut.result());
}
{
// Reference passing
std::string s = "foo";
ASSERT_OK_AND_ASSIGN(auto fut,
pool->Submit(inplace_add<std::string>, std::ref(s), "bar"));
ASSERT_OK_AND_EQ("foobar", fut.result());
ASSERT_EQ(s, "foobar");
}
{
// `void` return type
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(SleepFor, 0.001));
ASSERT_OK(fut.status());
}
}
TEST_F(TestThreadPool, SubmitWithStopToken) {
auto pool = this->MakeThreadPool(3);
{
StopSource stop_source;
ASSERT_OK_AND_ASSIGN(Future<int> fut,
pool->Submit(stop_source.token(), add<int>, 4, 5));
Result<int> res = fut.result();
ASSERT_OK_AND_EQ(9, res);
}
}
TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) {
auto pool = this->MakeThreadPool(3);
{
const int n_futures = 100;
StopSource stop_source;
StopToken stop_token = stop_source.token();
std::vector<Future<int>> futures;
for (int i = 0; i < n_futures; ++i) {
ASSERT_OK_AND_ASSIGN(
auto fut, pool->Submit(stop_token, slow_add<int>, 0.01 /*seconds*/, i, 1));
futures.push_back(std::move(fut));
}
SleepFor(0.05); // Let some work finish
stop_source.RequestStop();
int n_success = 0;
int n_cancelled = 0;
for (int i = 0; i < n_futures; ++i) {
Result<int> res = futures[i].result();
if (res.ok()) {
ASSERT_EQ(i + 1, *res);
++n_success;
} else {
ASSERT_RAISES(Cancelled, res);
++n_cancelled;
}
}
ASSERT_GT(n_success, 0);
ASSERT_GT(n_cancelled, 0);
}
}
// Test fork safety on Unix
#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
defined(THREAD_SANITIZER))
TEST_F(TestThreadPool, ForkSafety) {
pid_t child_pid;
int child_status;
{
// Fork after task submission
auto pool = this->MakeThreadPool(3);
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
ASSERT_OK_AND_EQ(9, fut.result());
child_pid = fork();
if (child_pid == 0) {
// Child: thread pool should be usable
ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
if (*fut.result() != 7) {
std::exit(1);
}
// Shutting down shouldn't hang or fail
Status st = pool->Shutdown();
std::exit(st.ok() ? 0 : 2);
} else {
// Parent
ASSERT_GT(child_pid, 0);
ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
ASSERT_TRUE(WIFEXITED(child_status));
ASSERT_EQ(WEXITSTATUS(child_status), 0);
ASSERT_OK(pool->Shutdown());
}
}
{
// Fork after shutdown
auto pool = this->MakeThreadPool(3);
ASSERT_OK(pool->Shutdown());
child_pid = fork();
if (child_pid == 0) {
// Child
// Spawning a task should return with error (pool was shutdown)
Status st = pool->Spawn([] {});
if (!st.IsInvalid()) {
std::exit(1);
}
// Trigger destructor
pool.reset();
std::exit(0);
} else {
// Parent
ASSERT_GT(child_pid, 0);
ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
ASSERT_TRUE(WIFEXITED(child_status));
ASSERT_EQ(WEXITSTATUS(child_status), 0);
}
}
}
#endif
TEST(TestGlobalThreadPool, Capacity) {
// Sanity check
auto pool = GetCpuThreadPool();
int capacity = pool->GetCapacity();
ASSERT_GT(capacity, 0);
ASSERT_EQ(GetCpuThreadPoolCapacity(), capacity);
// This value depends on whether any tasks were launched previously
ASSERT_GE(pool->GetActualCapacity(), 0);
ASSERT_LE(pool->GetActualCapacity(), capacity);
// Exercise default capacity heuristic
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
int hw_capacity = std::thread::hardware_concurrency();
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 7);
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 1);
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999"));
if (hw_capacity <= 999) {
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
}
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 6);
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 2);
// Invalid env values
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1"));
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
}
} // namespace internal
} // namespace arrow