ARROW-12379: [C++] Fix ThreadSanitizer failure in SerialExecutor

Closes #10025 from pitrou/ARROW-12379-tsan

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: David Li <li.davidm96@gmail.com>
diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc
index cd52360..6465ebb 100644
--- a/cpp/src/arrow/util/thread_pool.cc
+++ b/cpp/src/arrow/util/thread_pool.cc
@@ -22,7 +22,6 @@
 #include <deque>
 #include <list>
 #include <mutex>
-#include <queue>
 #include <string>
 #include <thread>
 #include <vector>
@@ -46,44 +45,54 @@
 }  // namespace
 
 struct SerialExecutor::State {
-  std::queue<Task> task_queue;
+  std::deque<Task> task_queue;
   std::mutex mutex;
   std::condition_variable wait_for_tasks;
-  bool finished;
+  bool finished{false};
 };
 
-SerialExecutor::SerialExecutor() : state_(new State()) {}
-SerialExecutor::~SerialExecutor() {}
+SerialExecutor::SerialExecutor() : state_(std::make_shared<State>()) {}
+
+SerialExecutor::~SerialExecutor() = default;
 
 Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
                                  StopToken stop_token, StopCallback&& stop_callback) {
-  // The serial task queue is truly serial (no mutex needed) but SpawnReal may be called
-  // from external threads (e.g. when transferring back from blocking I/O threads) so a
-  // mutex is needed
+  // While the SerialExecutor runs tasks synchronously on its main thread,
+  // SpawnReal may be called from external threads (e.g. when transferring back
+  // from blocking I/O threads), so we need to keep the state alive *and* to
+  // lock its contents.
+  //
+  // Note that holding the lock while notifying the condition variable may
+  // not be sufficient, as some exit paths in the main thread are unlocked.
+  auto state = state_;
   {
-    std::lock_guard<std::mutex> lg(state_->mutex);
-    state_->task_queue.push(
+    std::lock_guard<std::mutex> lk(state->mutex);
+    state->task_queue.push_back(
         Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
   }
-  state_->wait_for_tasks.notify_one();
+  state->wait_for_tasks.notify_one();
   return Status::OK();
 }
 
 void SerialExecutor::MarkFinished() {
-  std::lock_guard<std::mutex> lk(state_->mutex);
-  state_->finished = true;
-  // Keep the lock when notifying to avoid situations where the SerialExecutor
-  // would start being destroyed while the notify_one() call is still ongoing.
-  state_->wait_for_tasks.notify_one();
+  // Same comment as SpawnReal above
+  auto state = state_;
+  {
+    std::lock_guard<std::mutex> lk(state->mutex);
+    state->finished = true;
+  }
+  state->wait_for_tasks.notify_one();
 }
 
 void SerialExecutor::RunLoop() {
+  // This is called from the SerialExecutor's main thread, so the
+  // state is guaranteed to be kept alive.
   std::unique_lock<std::mutex> lk(state_->mutex);
 
   while (!state_->finished) {
     while (!state_->task_queue.empty()) {
       Task task = std::move(state_->task_queue.front());
-      state_->task_queue.pop();
+      state_->task_queue.pop_front();
       lk.unlock();
       if (!task.stop_token.IsStopRequested()) {
         std::move(task.callable)();
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index cd96438..c388680 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -225,7 +225,7 @@
 
   // State uses mutex
   struct State;
-  std::unique_ptr<State> state_;
+  std::shared_ptr<State> state_;
 
   template <typename T>
   Result<T> Run(TopLevelTask<T> initial_task) {
diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc
index 2390f8c..9926ac1 100644
--- a/cpp/src/arrow/util/thread_pool_test.cc
+++ b/cpp/src/arrow/util/thread_pool_test.cc
@@ -135,6 +135,30 @@
   Status RunVoid(FnOnce<Future<>(Executor*)> top_level_task) {
     return RunSynchronouslyVoid(std::move(top_level_task), UseThreads());
   }
+
+  void TestContinueAfterExternal(bool transfer_to_main_thread) {
+    bool continuation_ran = false;
+    EXPECT_OK_AND_ASSIGN(auto external_pool, ThreadPool::Make(1));
+    auto top_level_task = [&](Executor* executor) {
+      struct Callback {
+        Status operator()(...) {
+          *continuation_ran = true;
+          return Status::OK();
+        }
+        bool* continuation_ran;
+      };
+      auto fut = DeferNotOk(external_pool->Submit([&] {
+        SleepABit();
+        return Status::OK();
+      }));
+      if (transfer_to_main_thread) {
+        fut = executor->Transfer(fut);
+      }
+      return fut.Then(Callback{&continuation_ran});
+    };
+    ASSERT_OK(RunVoid(std::move(top_level_task)));
+    EXPECT_TRUE(continuation_ran);
+  }
 };
 
 TEST_P(TestRunSynchronously, SimpleRun) {
@@ -209,25 +233,16 @@
 }
 
 TEST_P(TestRunSynchronously, ContinueAfterExternal) {
-  bool continuation_ran = false;
-  EXPECT_OK_AND_ASSIGN(auto mock_io_pool, ThreadPool::Make(1));
-  auto top_level_task = [&](Executor* executor) {
-    struct Callback {
-      Status operator()(...) {
-        continuation_ran = true;
-        return Status::OK();
-      }
-      bool& continuation_ran;
-    };
-    return executor
-        ->Transfer(DeferNotOk(mock_io_pool->Submit([&] {
-          SleepABit();
-          return Status::OK();
-        })))
-        .Then(Callback{continuation_ran});
-  };
-  ASSERT_OK(RunVoid(std::move(top_level_task)));
-  EXPECT_TRUE(continuation_ran);
+  // The future returned by the top-level task completes on another thread.
+  // This can trigger delicate race conditions in the SerialExecutor code,
+  // especially destruction.
+  this->TestContinueAfterExternal(/*transfer_to_main_thread=*/false);
+}
+
+TEST_P(TestRunSynchronously, ContinueAfterExternalTransferred) {
+  // Like above, but the future is transferred back to the serial executor
+  // after completion on an external thread.
+  this->TestContinueAfterExternal(/*transfer_to_main_thread=*/true);
 }
 
 TEST_P(TestRunSynchronously, SchedulerAbort) {