blob: fe6509a2177c51e28a5e2a032f73c2b9b1e4f18a [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <functional>
#include <future>
#include <memory>
#include <stdexcept>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paimon/executor.h"
#include "paimon/result.h"
namespace paimon {
/// Submits a function to be executed asynchronously on a given executor and returns a
/// future.
///
/// This function wraps the provided callable (`func`) and submits it to the provided `executor`.
/// The function captures the result of `func` using a `std::promise`, which is used to fulfill
/// the returned `std::future`. If the callable throws an exception, the exception is captured
/// and set in the `promise`.
///
/// @tparam Func The type of the callable function.
/// @param executor The executor to run the function on. Must provide an `Add` method for task
/// submission.
/// @param func The function to execute asynchronously. Can be any callable object.
/// @return std::future<decltype(func())> A future that holds the result of the function
/// execution.
///
/// @note If `func` returns `void`, the returned future is of type `std::future<void>`.
template <typename Func>
auto Via(Executor* executor, Func&& func) -> std::future<decltype(func())> {
using ResultType = decltype(func());
// Check if func is callable (invocable)
static_assert(std::is_invocable_v<Func>, "func must be callable");
// Check if func is an empty std::function
if constexpr (std::is_constructible_v<std::function<void()>, Func>) {
std::function<void()> test_func = func;
if (!test_func) {
assert(false && "func cannot be an empty std::function");
}
}
// Create a promise to store the result or exception.
auto promise = std::make_shared<std::promise<ResultType>>();
auto future = promise->get_future(); // Retrieve the future associated with the promise.
// Wrap the task and submit it to the executor.
executor->Add([promise, func = std::forward<Func>(func)]() mutable {
try {
if constexpr (std::is_void_v<ResultType>) {
func();
promise->set_value();
} else {
promise->set_value(func());
}
} catch (...) {
promise->set_exception(std::current_exception());
}
});
return future;
}
/// Collects the results of multiple futures.
///
/// This function waits for all provided futures to complete and collects their results.
/// The results are returned in a `std::vector`, preserving the order of the input futures.
///
/// @tparam T The type of the result stored in the futures.
/// @param futures A container of futures (e.g., std::vector<std::future<T>>).
/// @return std::vector<T> A vector of results corresponding to the input futures.
///
/// @note If `T` is `void`, use Wait function instead.
template <typename T>
std::vector<T> CollectAll(std::vector<std::future<T>>& futures) {
std::vector<T> results;
results.reserve(futures.size()); // Reserve space to avoid reallocation.
for (auto& future : futures) {
results.push_back(future.get()); // Wait for each future and collect the result.
}
return results;
}
/// Waits for all futures with `void` return type to complete.
///
/// This function waits for each provided `std::future<void>` to complete.
/// It ensures that all asynchronous operations have finished execution.
///
/// @param futures A container of `std::future<void>` (e.g., `std::vector<std::future<void>>`).
///
/// @note Use this function when dealing with futures that do not return a value.
inline void Wait(std::vector<std::future<void>>& futures) {
for (auto& future : futures) {
if (future.valid()) {
future.get();
}
}
}
} // namespace paimon