blob: 5263816e35c53b4913e66a8a9ee6fa85d8e93787 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <concepts>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/util/executor.h"
#include "iceberg/util/functional.h"
#include "iceberg/util/retry_util.h"
namespace iceberg {
namespace internal {
template <typename F>
concept OnceStatusTask = RvalueInvocable<Status, F>;
template <typename T>
concept RepeatableStatusTask =
std::is_invocable_r_v<Status, const T&> ||
(std::copy_constructible<T> && std::is_invocable_r_v<Status, T&>);
template <typename F>
concept RetryableStatusTask = std::constructible_from<std::remove_cvref_t<F>, F> &&
RepeatableStatusTask<std::remove_cvref_t<F>>;
ICEBERG_EXPORT Status RunTasksSingleThreaded(std::vector<FnOnce<Status()>> tasks);
ICEBERG_EXPORT Status RunTasksParallel(Executor& executor,
std::vector<FnOnce<Status()>> tasks);
} // namespace internal
template <retry::Policy RetryPolicy = retry::NoRetry>
class ICEBERG_TEMPLATE_CLASS_EXPORT TaskGroup {
private:
static constexpr bool kRetryEnabled = !std::same_as<RetryPolicy, retry::NoRetry>;
struct Empty {};
using RetryConfigStorage = std::conditional_t<kRetryEnabled, RetryConfig, Empty>;
public:
TaskGroup() = default;
explicit TaskGroup(RetryConfig retry_config)
requires(kRetryEnabled)
: retry_config_(std::move(retry_config)) {}
auto&& SetExecutor(this auto&& self, OptionalExecutor executor) {
self.executor_ = std::move(executor);
return std::forward<decltype(self)>(self);
}
template <typename F>
requires((!kRetryEnabled && internal::OnceStatusTask<F>) ||
(kRetryEnabled && internal::RetryableStatusTask<F>))
auto&& Submit(this auto&& self, F&& task) {
self.tasks_.emplace_back([&] {
if constexpr (!kRetryEnabled) {
return std::forward<F>(task);
} else {
return [retry_config = self.retry_config_,
task = std::forward<F>(task)]() mutable -> Status {
return RetryRunner<RetryPolicy>(retry_config).Run(task);
};
}
}());
return std::forward<decltype(self)>(self);
}
Status Run() && {
if (!executor_.has_value()) {
return internal::RunTasksSingleThreaded(std::move(tasks_));
}
return internal::RunTasksParallel(executor_->get(), std::move(tasks_));
}
private:
std::vector<FnOnce<Status()>> tasks_;
OptionalExecutor executor_;
[[no_unique_address]] RetryConfigStorage retry_config_;
};
} // namespace iceberg