ARROW-12379: [C++] Fix ThreadSanitizer failure in SerialExecutor
Closes #10025 from pitrou/ARROW-12379-tsan
Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: David Li <li.davidm96@gmail.com>
diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc
index cd52360..6465ebb 100644
--- a/cpp/src/arrow/util/thread_pool.cc
+++ b/cpp/src/arrow/util/thread_pool.cc
@@ -22,7 +22,6 @@
#include <deque>
#include <list>
#include <mutex>
-#include <queue>
#include <string>
#include <thread>
#include <vector>
@@ -46,44 +45,54 @@
} // namespace
struct SerialExecutor::State {
- std::queue<Task> task_queue;
+ std::deque<Task> task_queue;
std::mutex mutex;
std::condition_variable wait_for_tasks;
- bool finished;
+ bool finished{false};
};
-SerialExecutor::SerialExecutor() : state_(new State()) {}
-SerialExecutor::~SerialExecutor() {}
+SerialExecutor::SerialExecutor() : state_(std::make_shared<State>()) {}
+
+SerialExecutor::~SerialExecutor() = default;
Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
StopToken stop_token, StopCallback&& stop_callback) {
- // The serial task queue is truly serial (no mutex needed) but SpawnReal may be called
- // from external threads (e.g. when transferring back from blocking I/O threads) so a
- // mutex is needed
+ // While the SerialExecutor runs tasks synchronously on its main thread,
+ // SpawnReal may be called from external threads (e.g. when transferring back
+ // from blocking I/O threads), so we need to keep the state alive *and* to
+ // lock its contents.
+ //
+ // Note that holding the lock while notifying the condition variable may
+ // not be sufficient, as some exit paths in the main thread are unlocked.
+ auto state = state_;
{
- std::lock_guard<std::mutex> lg(state_->mutex);
- state_->task_queue.push(
+ std::lock_guard<std::mutex> lk(state->mutex);
+ state->task_queue.push_back(
Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
}
- state_->wait_for_tasks.notify_one();
+ state->wait_for_tasks.notify_one();
return Status::OK();
}
void SerialExecutor::MarkFinished() {
- std::lock_guard<std::mutex> lk(state_->mutex);
- state_->finished = true;
- // Keep the lock when notifying to avoid situations where the SerialExecutor
- // would start being destroyed while the notify_one() call is still ongoing.
- state_->wait_for_tasks.notify_one();
+ // Same comment as SpawnReal above
+ auto state = state_;
+ {
+ std::lock_guard<std::mutex> lk(state->mutex);
+ state->finished = true;
+ }
+ state->wait_for_tasks.notify_one();
}
void SerialExecutor::RunLoop() {
+ // This is called from the SerialExecutor's main thread, so the
+ // state is guaranteed to be kept alive.
std::unique_lock<std::mutex> lk(state_->mutex);
while (!state_->finished) {
while (!state_->task_queue.empty()) {
Task task = std::move(state_->task_queue.front());
- state_->task_queue.pop();
+ state_->task_queue.pop_front();
lk.unlock();
if (!task.stop_token.IsStopRequested()) {
std::move(task.callable)();
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index cd96438..c388680 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -225,7 +225,7 @@
// State uses mutex
struct State;
- std::unique_ptr<State> state_;
+ std::shared_ptr<State> state_;
template <typename T>
Result<T> Run(TopLevelTask<T> initial_task) {
diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc
index 2390f8c..9926ac1 100644
--- a/cpp/src/arrow/util/thread_pool_test.cc
+++ b/cpp/src/arrow/util/thread_pool_test.cc
@@ -135,6 +135,30 @@
Status RunVoid(FnOnce<Future<>(Executor*)> top_level_task) {
return RunSynchronouslyVoid(std::move(top_level_task), UseThreads());
}
+
+ void TestContinueAfterExternal(bool transfer_to_main_thread) {
+ bool continuation_ran = false;
+ EXPECT_OK_AND_ASSIGN(auto external_pool, ThreadPool::Make(1));
+ auto top_level_task = [&](Executor* executor) {
+ struct Callback {
+ Status operator()(...) {
+ *continuation_ran = true;
+ return Status::OK();
+ }
+ bool* continuation_ran;
+ };
+ auto fut = DeferNotOk(external_pool->Submit([&] {
+ SleepABit();
+ return Status::OK();
+ }));
+ if (transfer_to_main_thread) {
+ fut = executor->Transfer(fut);
+ }
+ return fut.Then(Callback{&continuation_ran});
+ };
+ ASSERT_OK(RunVoid(std::move(top_level_task)));
+ EXPECT_TRUE(continuation_ran);
+ }
};
TEST_P(TestRunSynchronously, SimpleRun) {
@@ -209,25 +233,16 @@
}
TEST_P(TestRunSynchronously, ContinueAfterExternal) {
- bool continuation_ran = false;
- EXPECT_OK_AND_ASSIGN(auto mock_io_pool, ThreadPool::Make(1));
- auto top_level_task = [&](Executor* executor) {
- struct Callback {
- Status operator()(...) {
- continuation_ran = true;
- return Status::OK();
- }
- bool& continuation_ran;
- };
- return executor
- ->Transfer(DeferNotOk(mock_io_pool->Submit([&] {
- SleepABit();
- return Status::OK();
- })))
- .Then(Callback{continuation_ran});
- };
- ASSERT_OK(RunVoid(std::move(top_level_task)));
- EXPECT_TRUE(continuation_ran);
+ // The future returned by the top-level task completes on another thread.
+ // This can trigger delicate race conditions in the SerialExecutor code,
+ // especially destruction.
+ this->TestContinueAfterExternal(/*transfer_to_main_thread=*/false);
+}
+
+TEST_P(TestRunSynchronously, ContinueAfterExternalTransferred) {
+ // Like above, but the future is transferred back to the serial executor
+ // after completion on an external thread.
+ this->TestContinueAfterExternal(/*transfer_to_main_thread=*/true);
}
TEST_P(TestRunSynchronously, SchedulerAbort) {