blob: c4d4d1869c6a0c626564a9581b345e04878d835a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#ifndef _WIN32
#include <unistd.h>
#include <cstdint>
#include <memory>
#include <queue>
#include <type_traits>
#include <utility>
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/cancel.h"
#include "arrow/util/functional.h"
#include "arrow/util/future.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
#if defined(_MSC_VER)
// Disable harmless warning for decorated name length limit
#pragma warning(disable : 4503)
namespace arrow {
/// \brief Get the capacity of the global thread pool
/// Return the number of worker threads in the thread pool to which
/// Arrow dispatches various CPU-bound tasks. This is an ideal number,
/// not necessarily the exact number of threads at a given point in time.
/// You can change this number using SetCpuThreadPoolCapacity().
ARROW_EXPORT int GetCpuThreadPoolCapacity();
/// \brief Set the capacity of the global thread pool
/// Set the number of worker threads int the thread pool to which
/// Arrow dispatches various CPU-bound tasks.
/// The current number is returned by GetCpuThreadPoolCapacity().
ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads);
namespace internal {
// Hints about a task that may be used by an Executor.
// They are ignored by the provided ThreadPool implementation.
struct TaskHints {
// The lower, the more urgent
int32_t priority = 0;
// The IO transfer size in bytes
int64_t io_size = -1;
// The approximate CPU cost in number of instructions
int64_t cpu_cost = -1;
// An application-specific ID
int64_t external_id = -1;
class ARROW_EXPORT Executor {
using StopCallback = internal::FnOnce<void(const Status&)>;
virtual ~Executor();
// Spawn a fire-and-forget task.
template <typename Function>
Status Spawn(Function&& func, StopToken stop_token = StopToken::Unstoppable()) {
return SpawnReal(TaskHints{}, std::forward<Function>(func), std::move(stop_token),
template <typename Function>
Status Spawn(TaskHints hints, Function&& func,
StopToken stop_token = StopToken::Unstoppable()) {
return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
// Transfers a future to this executor. Any continuations added to the
// returned future will run in this executor. Otherwise they would run
// on the same thread that called MarkFinished.
// This is necessary when (for example) an I/O task is completing a future.
// The continuations of that future should run on the CPU thread pool keeping
// CPU heavy work off the I/O thread pool. So the I/O task should transfer
// the future to the CPU executor before returning.
template <typename T>
Future<T> Transfer(Future<T> future) {
auto transferred = Future<T>::Make();
auto callback = [this, transferred](const Result<T>& result) mutable {
auto spawn_status = Spawn([transferred, result]() mutable {
if (!spawn_status.ok()) {
auto callback_factory = [&callback]() { return callback; };
if (future.TryAddCallback(callback_factory)) {
return transferred;
// If the future is already finished and we aren't going to force spawn a thread
// then we don't need to add another layer of callback and can return the original
// future
return future;
// Submit a callable and arguments for execution. Return a future that
// will return the callable's result value once.
// The callable's arguments are copied before execution.
template <typename Function, typename... Args,
typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
Function && (Args && ...)>>
Result<FutureType> Submit(TaskHints hints, StopToken stop_token, Function&& func,
Args&&... args) {
using ValueType = typename FutureType::ValueType;
auto future = FutureType::Make();
auto task = std::bind(::arrow::detail::ContinueFuture{}, future,
std::forward<Function>(func), std::forward<Args>(args)...);
struct {
WeakFuture<ValueType> weak_fut;
void operator()(const Status& st) {
auto fut = weak_fut.get();
if (fut.is_valid()) {
} stop_callback{WeakFuture<ValueType>(future)};
ARROW_RETURN_NOT_OK(SpawnReal(hints, std::move(task), std::move(stop_token),
return future;
template <typename Function, typename... Args,
typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
Function && (Args && ...)>>
Result<FutureType> Submit(StopToken stop_token, Function&& func, Args&&... args) {
return Submit(TaskHints{}, stop_token, std::forward<Function>(func),
template <typename Function, typename... Args,
typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
Function && (Args && ...)>>
Result<FutureType> Submit(TaskHints hints, Function&& func, Args&&... args) {
return Submit(std::move(hints), StopToken::Unstoppable(),
std::forward<Function>(func), std::forward<Args>(args)...);
template <typename Function, typename... Args,
typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
Function && (Args && ...)>>
Result<FutureType> Submit(Function&& func, Args&&... args) {
return Submit(TaskHints{}, StopToken::Unstoppable(), std::forward<Function>(func),
// Return the level of parallelism (the number of tasks that may be executed
// concurrently). This may be an approximate number.
virtual int GetCapacity() = 0;
Executor() = default;
// Subclassing API
virtual Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
StopCallback&&) = 0;
/// \brief An executor implementation that runs all tasks on a single thread using an
/// event loop.
/// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are
/// fine but if one task needs to wait for another task it must be expressed as an
/// asynchronous continuation.
class ARROW_EXPORT SerialExecutor : public Executor {
template <typename T = ::arrow::detail::Empty>
using TopLevelTask = internal::FnOnce<Future<T>(Executor*)>;
int GetCapacity() override { return 1; };
Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
StopCallback&&) override;
/// \brief Runs the TopLevelTask and any scheduled tasks
/// The TopLevelTask (or one of the tasks it schedules) must either return an invalid
/// status or call the finish signal. Failure to do this will result in a deadlock. For
/// this reason it is preferable (if possible) to use the helper methods (below)
/// RunSynchronously/RunSerially which delegates the responsiblity onto a Future
/// producer's existing responsibility to always mark a future finished (which can
/// someday be aided by ARROW-12207).
template <typename T>
static Result<T> RunInSerialExecutor(TopLevelTask<T> initial_task) {
return SerialExecutor().Run<T>(std::move(initial_task));
// State uses mutex
struct State;
std::unique_ptr<State> state_;
template <typename T>
Result<T> Run(TopLevelTask<T> initial_task) {
auto final_fut = std::move(initial_task)(this);
if (final_fut.is_finished()) {
return final_fut.result();
final_fut.AddCallback([this](const Result<T>&) { MarkFinished(); });
return final_fut.result();
void RunLoop();
void MarkFinished();
/// An Executor implementation spawning tasks in FIFO manner on a fixed-size
/// pool of worker threads.
/// Note: Any sort of nested parallelism will deadlock this executor. Blocking waits are
/// fine but if one task needs to wait for another task it must be expressed as an
/// asynchronous continuation.
class ARROW_EXPORT ThreadPool : public Executor {
// Construct a thread pool with the given number of worker threads
static Result<std::shared_ptr<ThreadPool>> Make(int threads);
// Like Make(), but takes care that the returned ThreadPool is compatible
// with destruction late at process exit.
static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);
// Destroy thread pool; the pool will first be shut down
~ThreadPool() override;
// Return the desired number of worker threads.
// The actual number of workers may lag a bit before being adjusted to
// match this value.
int GetCapacity() override;
// Dynamically change the number of worker threads.
// This function always returns immediately.
// If fewer threads are running than this number, new threads are spawned
// on-demand when needed for task execution.
// If more threads are running than this number, excess threads are reaped
// as soon as possible.
Status SetCapacity(int threads);
// Heuristic for the default capacity of a thread pool for CPU-bound tasks.
// This is exposed as a static method to help with testing.
static int DefaultCapacity();
// Shutdown the pool. Once the pool starts shutting down, new tasks
// cannot be submitted anymore.
// If "wait" is true, shutdown waits for all pending tasks to be finished.
// If "wait" is false, workers are stopped as soon as currently executing
// tasks are finished.
Status Shutdown(bool wait = true);
struct State;
FRIEND_TEST(TestThreadPool, SetCapacity);
FRIEND_TEST(TestGlobalThreadPool, Capacity);
friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
StopCallback&&) override;
// Collect finished worker threads, making sure the OS threads have exited
void CollectFinishedWorkersUnlocked();
// Launch a given number of additional workers
void LaunchWorkersUnlocked(int threads);
// Get the current actual capacity
int GetActualCapacity();
// Reinitialize the thread pool if the pid changed
void ProtectAgainstFork();
static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
std::shared_ptr<State> sp_state_;
State* state_;
bool shutdown_on_destroy_;
#ifndef _WIN32
pid_t pid_;
// Return the process-global thread pool for CPU-bound tasks.
ARROW_EXPORT ThreadPool* GetCpuThreadPool();
/// \brief Potentially run an async operation serially (if use_threads is false)
/// \see RunSerially
/// If `use_threads` is true, the global CPU executor is used.
/// If `use_threads` is false, a temporary SerialExecutor is used.
/// `get_future` is called (from this thread) with the chosen executor and must
/// return a future that will eventually finish. This function returns once the
/// future has finished.
template <typename T>
Result<T> RunSynchronously(FnOnce<Future<T>(Executor*)> get_future, bool use_threads) {
if (use_threads) {
return std::move(get_future)(GetCpuThreadPool()).result();
} else {
return SerialExecutor::RunInSerialExecutor<T>(std::move(get_future));
ARROW_EXPORT Status RunSynchronouslyVoid(
FnOnce<Future<arrow::detail::Empty>(Executor*)> get_future, bool use_threads);
} // namespace internal
} // namespace arrow