blob: 3827e1645f9c69d5cfc0201dc908f7e853001fba [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cassert>
#include <deque>
#include <queue>
#include "arrow/util/functional.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
#include "arrow/util/queue.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
// The methods in this file create, modify, and utilize AsyncGenerator which is an
// iterator of futures. This allows an asynchronous source (like file input) to be run
// through a pipeline in the same way that iterators can be used to create pipelined
// workflows.
// In order to support pipeline parallelism we introduce the concept of asynchronous
// reentrancy. This is different than synchronous reentrancy. With synchronous code a
// function is reentrant if the function can be called again while a previous call to that
// function is still running. Unless otherwise specified none of these generators are
// synchronously reentrant. Care should be taken to avoid calling them in such a way (and
// the utilities Visit/Collect/Await take care to do this).
// Asynchronous reentrancy on the other hand means the function is called again before the
// future returned by the function is marked finished (but after the call to get the
// future returns). Some of these generators are async-reentrant while others (e.g.
// those that depend on ordered processing like decompression) are not. Read the MakeXYZ
// function comments to determine which generators support async reentrancy.
// Note: Generators that are not asynchronously reentrant can still support readahead
// (\see MakeSerialReadaheadGenerator).
// Readahead operators, and some other operators, may introduce queueing. Any operators
// that introduce buffering should detail the amount of buffering they introduce in their
// MakeXYZ function comments.
template <typename T>
using AsyncGenerator = std::function<Future<T>()>;
template <typename T>
struct IterationTraits<AsyncGenerator<T>> {
/// \brief by default when iterating through a sequence of AsyncGenerator<T>,
/// an empty function indicates the end of iteration.
static AsyncGenerator<T> End() { return AsyncGenerator<T>(); }
static bool IsEnd(const AsyncGenerator<T>& val) { return !val; }
template <typename T>
Future<T> AsyncGeneratorEnd() {
return Future<T>::MakeFinished(IterationTraits<T>::End());
/// returning a future that completes when all have been visited
template <typename T>
Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
std::function<Status(T)> visitor) {
struct LoopBody {
struct Callback {
Result<ControlFlow<detail::Empty>> operator()(const T& result) {
if (IsIterationEnd(result)) {
return Break(detail::Empty());
} else {
auto visited = visitor(result);
if (visited.ok()) {
return Continue();
} else {
return visited;
std::function<Status(T)> visitor;
Future<ControlFlow<detail::Empty>> operator()() {
Callback callback{visitor};
auto next = generator();
return next.Then(std::move(callback));
AsyncGenerator<T> generator;
std::function<Status(T)> visitor;
return Loop(LoopBody{std::move(generator), std::move(visitor)});
/// \brief Waits for an async generator to complete, discarding results.
template <typename T>
Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) {
std::function<Status(T)> visitor = [](const T&) { return Status::OK(); };
return VisitAsyncGenerator(generator, visitor);
/// \brief Collects the results of an async generator into a vector
template <typename T>
Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
auto vec = std::make_shared<std::vector<T>>();
struct LoopBody {
Future<ControlFlow<std::vector<T>>> operator()() {
auto next = generator_();
auto vec = vec_;
return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
if (IsIterationEnd(result)) {
return Break(*vec);
} else {
return Continue();
AsyncGenerator<T> generator_;
std::shared_ptr<std::vector<T>> vec_;
return Loop(LoopBody{std::move(generator), std::move(vec)});
/// \see MakeMappedGenerator
template <typename T, typename V>
class MappingGenerator {
MappingGenerator(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
: state_(std::make_shared<State>(std::move(source), std::move(map))) {}
Future<V> operator()() {
auto future = Future<V>::Make();
bool should_trigger;
auto guard = state_->mutex.Lock();
if (state_->finished) {
return AsyncGeneratorEnd<V>();
should_trigger = state_->waiting_jobs.empty();
if (should_trigger) {
return future;
struct State {
State(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
: source(std::move(source)),
finished(false) {}
void Purge() {
// This might be called by an original callback (if the source iterator fails or
// ends) or by a mapped callback (if the map function fails or ends prematurely).
// Either way it should only be called once and after finished is set so there is no
// need to guard access to `waiting_jobs`.
while (!waiting_jobs.empty()) {
AsyncGenerator<T> source;
std::function<Future<V>(const T&)> map;
std::deque<Future<V>> waiting_jobs;
util::Mutex mutex;
bool finished;
struct Callback;
struct MappedCallback {
void operator()(const Result<V>& maybe_next) {
bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
bool should_purge = false;
if (end) {
auto guard = state->mutex.Lock();
should_purge = !state->finished;
state->finished = true;
if (should_purge) {
std::shared_ptr<State> state;
Future<V> sink;
struct Callback {
void operator()(const Result<T>& maybe_next) {
Future<V> sink;
bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
bool should_purge = false;
bool should_trigger;
auto guard = state->mutex.Lock();
if (end) {
should_purge = !state->finished;
state->finished = true;
sink = state->waiting_jobs.front();
should_trigger = !end && !state->waiting_jobs.empty();
if (should_purge) {
if (should_trigger) {
if (maybe_next.ok()) {
const T& val = maybe_next.ValueUnsafe();
if (IsIterationEnd(val)) {
} else {
Future<V> mapped_fut = state->map(val);
mapped_fut.AddCallback(MappedCallback{std::move(state), std::move(sink)});
} else {
std::shared_ptr<State> state;
std::shared_ptr<State> state_;
/// \brief Creates a generator that will apply the map function to each element of
/// source. The map function is not called on the end token.
/// Note: This function makes a copy of `map` for each item
/// Note: Errors returned from the `map` function will be propagated
/// If the source generator is async-reentrant then this generator will be also
template <typename T, typename V>
AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
std::function<Result<V>(const T&)> map) {
std::function<Future<V>(const T&)> future_map = [map](const T& val) -> Future<V> {
return Future<V>::MakeFinished(map(val));
return MappingGenerator<T, V>(std::move(source_generator), std::move(future_map));
template <typename T, typename V>
AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
std::function<V(const T&)> map) {
std::function<Future<V>(const T&)> maybe_future_map = [map](const T& val) -> Future<V> {
return Future<V>::MakeFinished(map(val));
return MappingGenerator<T, V>(std::move(source_generator), std::move(maybe_future_map));
template <typename T, typename V>
AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
std::function<Future<V>(const T&)> map) {
return MappingGenerator<T, V>(std::move(source_generator), std::move(map));
template <typename V, typename T, typename MapFunc>
AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, MapFunc map) {
struct MapCallback {
MapFunc map;
Future<V> operator()(const T& val) { return EnsureFuture(map(val)); }
Future<V> EnsureFuture(Result<V> val) {
return Future<V>::MakeFinished(std::move(val));
Future<V> EnsureFuture(V val) { return Future<V>::MakeFinished(std::move(val)); }
Future<V> EnsureFuture(Future<V> val) { return val; }
std::function<Future<V>(const T&)> map_fn = MapCallback{map};
return MappingGenerator<T, V>(std::move(source_generator), map_fn);
/// \see MakeSequencingGenerator
template <typename T, typename ComesAfter, typename IsNext>
class SequencingGenerator {
SequencingGenerator(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next,
T initial_value)
: state_(std::make_shared<State>(std::move(source), std::move(compare),
std::move(is_next), std::move(initial_value))) {}
Future<T> operator()() {
auto guard = state_->mutex.Lock();
// We can send a result immediately if the top of the queue is either an
// error or the next item
if (!state_->queue.empty() &&
(!state_-> ||
state_->is_next(state_->previous_value, *state_-> {
auto result = std::move(state_->;
if (result.ok()) {
state_->previous_value = *result;
return Future<T>::MakeFinished(result);
if (state_->finished) {
return AsyncGeneratorEnd<T>();
// The next item is not in the queue so we will need to wait
auto new_waiting_fut = Future<T>::Make();
state_->waiting_future = new_waiting_fut;
return new_waiting_fut;
struct WrappedComesAfter {
bool operator()(const Result<T>& left, const Result<T>& right) {
if (!left.ok() || !right.ok()) {
// Should never happen
return false;
return compare(*left, *right);
ComesAfter compare;
struct State {
State(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next, T initial_value)
: source(std::move(source)),
mutex() {}
AsyncGenerator<T> source;
IsNext is_next;
T previous_value;
Future<T> waiting_future;
std::priority_queue<Result<T>, std::vector<Result<T>>, WrappedComesAfter> queue;
bool finished;
util::Mutex mutex;
class Callback {
explicit Callback(std::shared_ptr<State> state) : state_(std::move(state)) {}
void operator()(const Result<T> result) {
Future<T> to_deliver;
bool finished;
auto guard = state_->mutex.Lock();
bool ready_to_deliver = false;
if (!result.ok()) {
// Clear any cached results
while (!state_->queue.empty()) {
ready_to_deliver = true;
state_->finished = true;
} else if (IsIterationEnd<T>(result.ValueUnsafe())) {
ready_to_deliver = state_->queue.empty();
state_->finished = true;
} else {
ready_to_deliver = state_->is_next(state_->previous_value, *result);
if (ready_to_deliver && state_->waiting_future.is_valid()) {
to_deliver = state_->waiting_future;
if (result.ok()) {
state_->previous_value = *result;
} else {
// Capture state_->finished so we can access it outside the mutex
finished = state_->finished;
// Must deliver result outside of the mutex
if (to_deliver.is_valid()) {
} else {
// Otherwise, if we didn't get the next item (or a terminal item), we
// need to keep looking
if (!finished) {
const std::shared_ptr<State> state_;
const std::shared_ptr<State> state_;
/// \brief Buffers an AsyncGenerator to return values in sequence order ComesAfter
/// and IsNext determine the sequence order.
/// ComesAfter should be a BinaryPredicate that only returns true if a comes after b
/// IsNext should be a BinaryPredicate that returns true, given `a` and `b`, only if
/// `b` follows immediately after `a`. It should return true given `initial_value` and
/// `b` if `b` is the first item in the sequence.
/// This operator will queue unboundedly while waiting for the next item. It is intended
/// for jittery sources that might scatter an ordered sequence. It is NOT intended to
/// sort. Using it to try and sort could result in excessive RAM usage. This generator
/// will queue up to N blocks where N is the max "out of order"ness of the source.
/// For example, if the source is 1,6,2,5,4,3 it will queue 3 blocks because 3 is 3
/// blocks beyond where it belongs.
/// This generator is not async-reentrant but it consists only of a simple log(n)
/// insertion into a priority queue.
template <typename T, typename ComesAfter, typename IsNext>
AsyncGenerator<T> MakeSequencingGenerator(AsyncGenerator<T> source_generator,
ComesAfter compare, IsNext is_next,
T initial_value) {
return SequencingGenerator<T, ComesAfter, IsNext>(
std::move(source_generator), std::move(compare), std::move(is_next),
/// \see MakeTransformedGenerator
template <typename T, typename V>
class TransformingGenerator {
// The transforming generator state will be referenced as an async generator but will
// also be referenced via callback to various futures. If the async generator owner
// moves it around we need the state to be consistent for future callbacks.
struct TransformingGeneratorState
: std::enable_shared_from_this<TransformingGeneratorState> {
TransformingGeneratorState(AsyncGenerator<T> generator, Transformer<T, V> transformer)
: generator_(std::move(generator)),
finished_() {}
Future<V> operator()() {
while (true) {
auto maybe_next_result = Pump();
if (!maybe_next_result.ok()) {
return Future<V>::MakeFinished(maybe_next_result.status());
auto maybe_next = std::move(maybe_next_result).ValueUnsafe();
if (maybe_next.has_value()) {
return Future<V>::MakeFinished(*std::move(maybe_next));
auto next_fut = generator_();
// If finished already, process results immediately inside the loop to avoid
// stack overflow
if (next_fut.is_finished()) {
auto next_result = next_fut.result();
if (next_result.ok()) {
last_value_ = *next_result;
} else {
return Future<V>::MakeFinished(next_result.status());
// Otherwise, if not finished immediately, add callback to process results
} else {
auto self = this->shared_from_this();
return next_fut.Then([self](const Result<T>& next_result) {
if (next_result.ok()) {
self->last_value_ = *next_result;
return (*self)();
} else {
return Future<V>::MakeFinished(next_result.status());
// See comment on TransformingIterator::Pump
Result<util::optional<V>> Pump() {
if (!finished_ && last_value_.has_value()) {
ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
if (next.ReadyForNext()) {
if (IsIterationEnd(*last_value_)) {
finished_ = true;
if (next.Finished()) {
finished_ = true;
if (next.HasValue()) {
return next.Value();
if (finished_) {
return IterationTraits<V>::End();
return util::nullopt;
AsyncGenerator<T> generator_;
Transformer<T, V> transformer_;
util::optional<T> last_value_;
bool finished_;
explicit TransformingGenerator(AsyncGenerator<T> generator,
Transformer<T, V> transformer)
: state_(std::make_shared<TransformingGeneratorState>(std::move(generator),
std::move(transformer))) {}
Future<V> operator()() { return (*state_)(); }
std::shared_ptr<TransformingGeneratorState> state_;
/// \brief Transforms an async generator using a transformer function returning a new
/// AsyncGenerator
/// The transform function here behaves exactly the same as the transform function in
/// MakeTransformedIterator and you can safely use the same transform function to
/// transform both synchronous and asynchronous streams.
/// This generator is not async-reentrant
/// This generator may queue up to 1 instance of T
template <typename T, typename V>
AsyncGenerator<V> MakeTransformedGenerator(AsyncGenerator<T> generator,
Transformer<T, V> transformer) {
return TransformingGenerator<T, V>(generator, transformer);
/// \see MakeSerialReadaheadGenerator
template <typename T>
class SerialReadaheadGenerator {
SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
: state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
Future<T> operator()() {
if (state_->first_) {
// Lazy generator, need to wait for the first ask to prime the pump
state_->first_ = false;
auto next = state_->source_();
return next.Then(Callback{state_});
// This generator is not async-reentrant. We won't be called until the last
// future finished so we know there is something in the queue
auto finished = state_->finished_.load();
if (finished && state_->readahead_queue_.IsEmpty()) {
return AsyncGeneratorEnd<T>();
std::shared_ptr<Future<T>> next;
if (!state_->readahead_queue_.Read(next)) {
return Status::UnknownError("Could not read from readahead_queue");
auto last_available = state_->spaces_available_.fetch_add(1);
if (last_available == 0 && !finished) {
// Reader idled out, we need to restart it
return *next;
struct State {
State(AsyncGenerator<T> source, int max_readahead)
: first_(true),
// There is one extra "space" for the in-flight request
spaces_available_(max_readahead + 1),
// The SPSC queue has size-1 "usable" slots so we need to overallocate 1
readahead_queue_(max_readahead + 1) {}
Status Pump(const std::shared_ptr<State>& self) {
// Can't do readahead_queue.write(source().Then(Callback{self})) because then the
// callback might run immediately and add itself to the queue before this gets added
// to the queue messing up the order.
auto next_slot = std::make_shared<Future<T>>();
auto written = readahead_queue_.Write(next_slot);
if (!written) {
return Status::UnknownError("Could not write to readahead_queue");
// If this Pump is being called from a callback it is possible for the source to
// poll and read from the queue between the Write and this spot where we fill the
// value in. However, it is not possible for the future to read this value we are
// writing. That is because this callback (the callback for future X) must be
// finished before future X is marked complete and this source is not pulled
// reentrantly so it will not poll for future X+1 until this callback has completed.
*next_slot = source_().Then(Callback{self});
return Status::OK();
// Only accessed by the consumer end
bool first_;
// Accessed by both threads
AsyncGenerator<T> source_;
std::atomic<bool> finished_;
// The queue has a size but it is not atomic. We keep track of how many spaces are
// left in the queue here so we know if we've just written the last value and we need
// to stop reading ahead or if we've just read from a full queue and we need to
// restart reading ahead
std::atomic<uint32_t> spaces_available_;
// Needs to be a queue of shared_ptr and not Future because we set the value of the
// future after we add it to the queue
util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue_;
struct Callback {
Result<T> operator()(const Result<T>& maybe_next) {
if (!maybe_next.ok()) {
return maybe_next;
const auto& next = *maybe_next;
if (IsIterationEnd(next)) {
return maybe_next;
auto last_available = state_->spaces_available_.fetch_sub(1);
if (last_available > 1) {
return maybe_next;
std::shared_ptr<State> state_;
std::shared_ptr<State> state_;
/// \see MakeFromFuture
template <typename T>
class FutureFirstGenerator {
explicit FutureFirstGenerator(Future<AsyncGenerator<T>> future)
: state_(std::make_shared<State>(std::move(future))) {}
Future<T> operator()() {
if (state_->source_) {
return state_->source_();
} else {
auto state = state_;
return state_->future_.Then([state](const AsyncGenerator<T>& source) {
state->source_ = source;
return state->source_();
struct State {
explicit State(Future<AsyncGenerator<T>> future) : future_(future), source_() {}
Future<AsyncGenerator<T>> future_;
AsyncGenerator<T> source_;
std::shared_ptr<State> state_;
/// \brief Transforms a Future<AsyncGenerator<T>> into an AsyncGenerator<T>
/// that waits for the future to complete as part of the first item.
/// This generator is not async-reentrant (even if the generator yielded by future is)
/// This generator does not queue
template <typename T>
AsyncGenerator<T> MakeFromFuture(Future<AsyncGenerator<T>> future) {
return FutureFirstGenerator<T>(std::move(future));
/// \brief Creates a generator that will pull from the source into a queue. Unlike
/// MakeReadaheadGenerator this will not pull reentrantly from the source.
/// The source generator does not need to be async-reentrant
/// This generator is not async-reentrant (even if the source is)
/// This generator may queue up to max_readahead additional instances of T
template <typename T>
AsyncGenerator<T> MakeSerialReadaheadGenerator(AsyncGenerator<T> source_generator,
int max_readahead) {
return SerialReadaheadGenerator<T>(std::move(source_generator), max_readahead);
/// \see MakeReadaheadGenerator
template <typename T>
class ReadaheadGenerator {
ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
: source_generator_(std::move(source_generator)), max_readahead_(max_readahead) {
auto finished = std::make_shared<std::atomic<bool>>(false);
mark_finished_if_done_ = [finished](const Result<T>& next_result) {
if (!next_result.ok()) {
} else {
if (IsIterationEnd(*next_result)) {
*finished = true;
finished_ = std::move(finished);
Future<T> operator()() {
if (readahead_queue_.empty()) {
// This is the first request, let's pump the underlying queue
for (int i = 0; i < max_readahead_; i++) {
auto next = source_generator_();
// Pop one and add one
auto result = readahead_queue_.front();
if (finished_->load()) {
} else {
auto back_of_queue = source_generator_();
return result;
AsyncGenerator<T> source_generator_;
int max_readahead_;
std::function<void(const Result<T>&)> mark_finished_if_done_;
// Can't use a bool here because finished may be referenced by callbacks that
// outlive this class
std::shared_ptr<std::atomic<bool>> finished_;
std::queue<Future<T>> readahead_queue_;
/// \brief A generator where the producer pushes items on a queue.
/// No back-pressure is applied, so this generator is mostly useful when
/// producing the values is neither CPU- nor memory-expensive (e.g. fetching
/// filesystem metadata).
/// This generator is not async-reentrant.
template <typename T>
class PushGenerator {
struct State {
util::Mutex mutex;
std::deque<Result<T>> result_q;
util::optional<Future<T>> consumer_fut;
bool finished = false;
/// Producer API for PushGenerator
class Producer {
explicit Producer(const std::shared_ptr<State> state) : weak_state_(state) {}
/// \brief Push a value on the queue
/// True is returned if the value was pushed, false if the generator is
/// already closed or destroyed. If the latter, it is recommended to stop
/// producing any further values.
bool Push(Result<T> result) {
auto state = weak_state_.lock();
if (!state) {
// Generator was destroyed
return false;
auto lock = state->mutex.Lock();
if (state->finished) {
// Closed early
return false;
if (state->consumer_fut.has_value()) {
auto fut = std::move(state->consumer_fut.value());
lock.Unlock(); // unlock before potentially invoking a callback
} else {
return true;
/// \brief Tell the consumer we have finished producing
/// It is allowed to call this and later call Push() again ("early close").
/// In this case, calls to Push() after the queue is closed are silently
/// ignored. This can help implementing non-trivial cancellation cases.
/// True is returned on success, false if the generator is already closed
/// or destroyed.
bool Close() {
auto state = weak_state_.lock();
if (!state) {
// Generator was destroyed
return false;
auto lock = state->mutex.Lock();
if (state->finished) {
// Already closed
return false;
state->finished = true;
if (state->consumer_fut.has_value()) {
auto fut = std::move(state->consumer_fut.value());
lock.Unlock(); // unlock before potentially invoking a callback
return true;
/// Return whether the generator was closed or destroyed.
bool is_closed() const {
auto state = weak_state_.lock();
if (!state) {
// Generator was destroyed
return true;
auto lock = state->mutex.Lock();
return state->finished;
const std::weak_ptr<State> weak_state_;
PushGenerator() : state_(std::make_shared<State>()) {}
/// Read an item from the queue
Future<T> operator()() {
auto lock = state_->mutex.Lock();
assert(!state_->consumer_fut.has_value()); // Non-reentrant
if (!state_->result_q.empty()) {
auto fut = Future<T>::MakeFinished(std::move(state_->result_q.front()));
return fut;
if (state_->finished) {
return AsyncGeneratorEnd<T>();
auto fut = Future<T>::Make();
state_->consumer_fut = fut;
return fut;
/// \brief Return producer-side interface
/// The returned object must be used by the producer to push values on the queue.
/// Only a single Producer object should be instantiated.
Producer producer() { return Producer{state_}; }
const std::shared_ptr<State> state_;
/// \brief Creates a generator that pulls reentrantly from a source
/// This generator will pull reentrantly from a source, ensuring that max_readahead
/// requests are active at any given time.
/// The source generator must be async-reentrant
/// This generator itself is async-reentrant.
/// This generator may queue up to max_readahead instances of T
template <typename T>
AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
int max_readahead) {
return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
/// \brief Creates a generator that will yield finished futures from a vector
/// This generator is async-reentrant
template <typename T>
AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
struct State {
explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
std::vector<T> vec;
std::atomic<std::size_t> vec_idx;
auto state = std::make_shared<State>(std::move(vec));
return [state]() {
auto idx = state->vec_idx.fetch_add(1);
if (idx >= state->vec.size()) {
return AsyncGeneratorEnd<T>();
return Future<T>::MakeFinished(state->vec[idx]);
/// \see MakeMergedGenerator
template <typename T>
class MergedGenerator {
explicit MergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
int max_subscriptions)
: state_(std::make_shared<State>(std::move(source), max_subscriptions)) {}
Future<T> operator()() {
Future<T> waiting_future;
std::shared_ptr<DeliveredJob> delivered_job;
auto guard = state_->mutex.Lock();
if (!state_->delivered_jobs.empty()) {
delivered_job = std::move(state_->delivered_jobs.front());
} else if (state_->finished) {
return IterationTraits<T>::End();
} else {
waiting_future = Future<T>::Make();
if (delivered_job) {
// deliverer will be invalid if outer callback encounters an error and delivers a
// failed result
if (delivered_job->deliverer) {
InnerCallback{state_, delivered_job->index});
return std::move(delivered_job->value);
if (state_->first) {
state_->first = false;
for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) {
state_->source().AddCallback(OuterCallback{state_, i});
return waiting_future;
struct DeliveredJob {
explicit DeliveredJob(AsyncGenerator<T> deliverer_, Result<T> value_,
std::size_t index_)
: deliverer(deliverer_), value(std::move(value_)), index(index_) {}
AsyncGenerator<T> deliverer;
Result<T> value;
std::size_t index;
struct State {
State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
: source(std::move(source)),
num_active_subscriptions(max_subscriptions) {}
AsyncGenerator<AsyncGenerator<T>> source;
// active_subscriptions and delivered_jobs will be bounded by max_subscriptions
std::vector<AsyncGenerator<T>> active_subscriptions;
std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
// waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will provide the
// backpressure
std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
util::Mutex mutex;
bool first;
bool source_exhausted;
bool finished;
int num_active_subscriptions;
struct InnerCallback {
void operator()(const Result<T>& maybe_next) {
Future<T> sink;
bool sub_finished = maybe_next.ok() && IsIterationEnd(*maybe_next);
auto guard = state->mutex.Lock();
if (state->finished) {
// We've errored out so just ignore this result and don't keep pumping
if (!sub_finished) {
if (state->waiting_jobs.empty()) {
state->active_subscriptions[index], maybe_next, index));
} else {
sink = std::move(*state->waiting_jobs.front());
if (sub_finished) {
state->source().AddCallback(OuterCallback{state, index});
} else if (sink.is_valid()) {
if (maybe_next.ok()) {
std::shared_ptr<State> state;
std::size_t index;
struct OuterCallback {
void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
bool should_purge = false;
bool should_continue = false;
Future<T> error_sink;
auto guard = state->mutex.Lock();
if (!maybe_next.ok() || IsIterationEnd(*maybe_next)) {
state->source_exhausted = true;
if (!maybe_next.ok() || --state->num_active_subscriptions == 0) {
state->finished = true;
should_purge = true;
if (!maybe_next.ok()) {
if (state->waiting_jobs.empty()) {
AsyncGenerator<T>(), maybe_next.status(), index));
} else {
error_sink = std::move(*state->waiting_jobs.front());
} else {
state->active_subscriptions[index] = *maybe_next;
should_continue = true;
if (error_sink.is_valid()) {
if (should_continue) {
(*maybe_next)().AddCallback(InnerCallback{state, index});
} else if (should_purge) {
// At this point state->finished has been marked true so no one else
// will be interacting with waiting_jobs and we can iterate outside lock
while (!state->waiting_jobs.empty()) {
std::shared_ptr<State> state;
std::size_t index;
std::shared_ptr<State> state_;
/// \brief Creates a generator that takes in a stream of generators and pulls from up to
/// max_subscriptions at a time
/// Note: This may deliver items out of sequence. For example, items from the third
/// AsyncGenerator generated by the source may be emitted before some items from the first
/// AsyncGenerator generated by the source.
/// This generator will pull from source async-reentrantly unless max_subscriptions is 1
/// This generator will not pull from the individual subscriptions reentrantly. Add
/// readahead to the individual subscriptions if that is desired.
/// This generator is async-reentrant
/// This generator may queue up to max_subscriptions instances of T
template <typename T>
AsyncGenerator<T> MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
int max_subscriptions) {
return MergedGenerator<T>(std::move(source), max_subscriptions);
/// \brief Creates a generator that takes in a stream of generators and pulls from each
/// one in sequence.
/// This generator is async-reentrant but will never pull from source reentrantly and
/// will never pull from any subscription reentrantly.
/// This generator may queue 1 instance of T
template <typename T>
AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> source) {
return MergedGenerator<T>(std::move(source), 1);
template <typename T>
struct Enumerated {
T value;
int index;
bool last;
template <typename T>
struct IterationTraits<Enumerated<T>> {
static Enumerated<T> End() { return Enumerated<T>{IterationEnd<T>(), -1, false}; }
static bool IsEnd(const Enumerated<T>& val) { return val.index < 0; }
/// \see MakeEnumeratedGenerator
template <typename T>
class EnumeratingGenerator {
EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
: state_(std::make_shared<State>(std::move(source), std::move(initial_value))) {}
Future<Enumerated<T>> operator()() {
if (state_->finished) {
return AsyncGeneratorEnd<Enumerated<T>>();
} else {
auto state = state_;
return state->source().Then([state](const T& next) {
auto finished = IsIterationEnd<T>(next);
auto prev = Enumerated<T>{state->prev_value, state->prev_index, finished};
state->prev_value = next;
state->finished = finished;
return prev;
struct State {
State(AsyncGenerator<T> source, T initial_value)
: source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) {
finished = IsIterationEnd<T>(prev_value);
AsyncGenerator<T> source;
T prev_value;
int prev_index;
bool finished;
std::shared_ptr<State> state_;
/// Wraps items from a source generator with positional information
/// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be
/// processed in a "first-available" fashion and later resequenced which can reduce the
/// impact of sources with erratic performance (e.g. a filesystem where some items may
/// take longer to read than others).
/// TODO(ARROW-12371) Would require this generator be async-reentrant
/// \see MakeSequencingGenerator for an example of putting items back in order
/// This generator is not async-reentrant
/// This generator buffers one item (so it knows which item is the last item)
template <typename T>
AsyncGenerator<Enumerated<T>> MakeEnumeratedGenerator(AsyncGenerator<T> source) {
return FutureFirstGenerator<Enumerated<T>>(
source().Then([source](const T& initial_value) -> AsyncGenerator<Enumerated<T>> {
return EnumeratingGenerator<T>(std::move(source), initial_value);
/// \see MakeTransferredGenerator
template <typename T>
class TransferringGenerator {
explicit TransferringGenerator(AsyncGenerator<T> source, internal::Executor* executor)
: source_(std::move(source)), executor_(executor) {}
Future<T> operator()() { return executor_->Transfer(source_()); }
AsyncGenerator<T> source_;
internal::Executor* executor_;
/// \brief Transfers a future to an underlying executor.
/// Continuations run on the returned future will be run on the given executor
/// if they cannot be run synchronously.
/// This is often needed to move computation off I/O threads or other external
/// completion sources and back on to the CPU executor so the I/O thread can
/// stay busy and focused on I/O
/// Keep in mind that continuations called on an already completed future will
/// always be run synchronously and so no transfer will happen in that case.
/// This generator is async reentrant if the source is
/// This generator will not queue
template <typename T>
AsyncGenerator<T> MakeTransferredGenerator(AsyncGenerator<T> source,
internal::Executor* executor) {
return TransferringGenerator<T>(std::move(source), executor);
/// \see MakeBackgroundGenerator
template <typename T>
class BackgroundGenerator {
explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor, int max_q,
int q_restart)
: state_(std::make_shared<State>(io_executor, std::move(it), max_q, q_restart)),
cleanup_(std::make_shared<Cleanup>(state_.get())) {}
Future<T> operator()() {
auto guard = state_->mutex.Lock();
Future<T> waiting_future;
if (state_->queue.empty()) {
if (state_->finished) {
return AsyncGeneratorEnd<T>();
} else {
waiting_future = Future<T>::Make();
state_->waiting_future = waiting_future;
} else {
auto next = Future<T>::MakeFinished(std::move(state_->queue.front()));
if (state_->NeedsRestart()) {
return state_->RestartTask(state_, std::move(guard), std::move(next));
return next;
// This should only trigger the very first time this method is called
if (state_->NeedsRestart()) {
return state_->RestartTask(state_, std::move(guard), std::move(waiting_future));
return waiting_future;
struct State {
State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart)
: io_executor(io_executor),
should_shutdown(false) {}
void ClearQueue() {
while (!queue.empty()) {
bool TaskIsRunning() const { return task_finished.is_valid(); }
bool NeedsRestart() const {
return !finished && !reading && static_cast<int>(queue.size()) <= q_restart;
void DoRestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) {
// If we get here we are actually going to start a new task so let's create a
// task_finished future for it
state->task_finished = Future<>::Make();
state->reading = true;
auto spawn_status = io_executor->Spawn(
[state]() { BackgroundGenerator::WorkerTask(std::move(state)); });
if (!spawn_status.ok()) {
// If we can't spawn a new task then send an error to the consumer (either via a
// waiting future or the queue) and mark ourselves finished
state->finished = true;
state->task_finished = Future<>();
if (waiting_future.has_value()) {
auto to_deliver = std::move(waiting_future.value());
} else {
Future<T> RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard,
Future<T> next) {
if (TaskIsRunning()) {
// If the task is still cleaning up we need to wait for it to finish before
// restarting. We also want to block the consumer until we've restarted the
// reader to avoid multiple restarts
return task_finished.Then([state, next](...) {
// This may appear dangerous (recursive mutex) but we should be guaranteed the
// outer guard has been released by this point. We know...
// * task_finished is not already finished (it would be invalid in that case)
// * task_finished will not be marked complete until we've given up the mutex
auto guard_ = state->mutex.Lock();
state->DoRestartTask(state, std::move(guard_));
return next;
// Otherwise we can restart immediately
DoRestartTask(std::move(state), std::move(guard));
return next;
internal::Executor* io_executor;
const int max_q;
const int q_restart;
Iterator<T> it;
// If true, the task is actively pumping items from the queue and does not need a
// restart
bool reading;
// Set to true when a terminal item arrives
bool finished;
// Signal to the background task to end early because consumers have given up on it
bool should_shutdown;
// If the queue is empty then the consumer will create a waiting future and wait for
// it
std::queue<Result<T>> queue;
util::optional<Future<T>> waiting_future;
// Every background task is given a future to complete when it is entirely finished
// processing and ready for the next task to start or for State to be destroyed
Future<> task_finished;
util::Mutex mutex;
// Cleanup task that will be run when all consumer references to the generator are lost
struct Cleanup {
explicit Cleanup(State* state) : state(state) {}
~Cleanup() {
Future<> finish_fut;
auto lock = state->mutex.Lock();
if (!state->TaskIsRunning()) {
// Signal the current task to stop and wait for it to finish
state->should_shutdown = true;
finish_fut = state->task_finished;
// Using future as a condition variable here
Status st = finish_fut.status();
State* state;
static void WorkerTask(std::shared_ptr<State> state) {
// We need to capture the state to read while outside the mutex
bool reading = true;
while (reading) {
auto next = state->it.Next();
// Need to capture state->waiting_future inside the mutex to mark finished outside
Future<T> waiting_future;
auto guard = state->mutex.Lock();
if (state->should_shutdown) {
state->finished = true;
if (!next.ok() || IsIterationEnd<T>(*next)) {
// Terminal item. Mark finished to true, send this last item, and quit
state->finished = true;
if (!next.ok()) {
// At this point we are going to send an item. Either we will add it to the
// queue or deliver it to a waiting future.
if (state->waiting_future.has_value()) {
waiting_future = std::move(state->waiting_future.value());
} else {
// We just filled up the queue so it is time to quit. We may need to notify
// a cleanup task so we transition to Quitting
if (static_cast<int>(state->queue.size()) >= state->max_q) {
state->reading = false;
reading = state->reading && !state->finished;
// This should happen outside the mutex. Presumably there is a
// transferring generator on the other end that will quickly transfer any
// callbacks off of this thread so we can continue looping. Still, best not to
// rely on that
if (waiting_future.is_valid()) {
// Once we've sent our last item we can notify any waiters that we are done and so
// either state can be cleaned up or a new background task can be started
Future<> task_finished;
auto guard = state->mutex.Lock();
// After we give up the mutex state can be safely deleted. We will no longer
// reference it. We can safely transition to idle now.
task_finished = state->task_finished;
state->task_finished = Future<>();
std::shared_ptr<State> state_;
// state_ is held by both the generator and the background thread so it won't be cleaned
// up when all consumer references are relinquished. cleanup_ is only held by the
// generator so it will be destructed when the last consumer reference is gone. We use
// this to cleanup / stop the background generator in case the consuming end stops
// listening (e.g. due to a downstream error)
std::shared_ptr<Cleanup> cleanup_;
constexpr int kDefaultBackgroundMaxQ = 32;
constexpr int kDefaultBackgroundQRestart = 16;
/// \brief Creates an AsyncGenerator<T> by iterating over an Iterator<T> on a background
/// thread
/// The parameter max_q and q_restart control queue size and background thread task
/// management. If the background task is fast you typically don't want it creating a
/// thread task for every item. Instead the background thread will run until it fills
/// up a readahead queue.
/// Once the queue has filled up the background thread task will terminate (allowing other
/// I/O tasks to use the thread). Once the queue has been drained enough (specified by
/// q_restart) then the background thread task will be restarted. If q_restart is too low
/// then you may exhaust the queue waiting for the background thread task to start running
/// again. If it is too high then it will be constantly stopping and restarting the
/// background queue task
/// This generator is not async-reentrant
/// This generator will queue up to max_q blocks
template <typename T>
static Result<AsyncGenerator<T>> MakeBackgroundGenerator(
Iterator<T> iterator, internal::Executor* io_executor,
int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) {
if (max_q < q_restart) {
return Status::Invalid("max_q must be >= q_restart");
return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q, q_restart);
/// \see MakeGeneratorIterator
template <typename T>
class GeneratorIterator {
explicit GeneratorIterator(AsyncGenerator<T> source) : source_(std::move(source)) {}
Result<T> Next() { return source_().result(); }
AsyncGenerator<T> source_;
/// \brief Converts an AsyncGenerator<T> to an Iterator<T> by blocking until each future
/// is finished
template <typename T>
Result<Iterator<T>> MakeGeneratorIterator(AsyncGenerator<T> source) {
return Iterator<T>(GeneratorIterator<T>(std::move(source)));
/// \brief Adds readahead to an iterator using a background thread.
/// Under the hood this is converting the iterator to a generator using
/// MakeBackgroundGenerator, adding readahead to the converted generator with
/// MakeReadaheadGenerator, and then converting back to an iterator using
/// MakeGeneratorIterator.
template <typename T>
Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> it, int readahead_queue_size) {
ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1));
auto max_q = readahead_queue_size;
auto q_restart = std::max(1, max_q / 2);
auto background_generator,
MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart));
// Capture io_executor to keep it alive as long as owned_bg_generator is still
// referenced
AsyncGenerator<T> owned_bg_generator = [io_executor, background_generator]() {
return background_generator();
return MakeGeneratorIterator(std::move(owned_bg_generator));
/// \brief Make a generator that returns a single pre-generated future
/// This generator is async-reentrant.
template <typename T>
std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
auto state = std::make_shared<Future<T>>(std::move(future));
return [state]() -> Future<T> {
auto fut = std::move(*state);
if (fut.is_valid()) {
return fut;
} else {
return AsyncGeneratorEnd<T>();
/// \brief Make a generator that always fails with a given error
/// This generator is async-reentrant.
template <typename T>
AsyncGenerator<T> MakeFailingGenerator(Status st) {
auto state = std::make_shared<Status>(std::move(st));
return [state]() -> Future<T> {
auto st = std::move(*state);
if (!st.ok()) {
return std::move(st);
} else {
return AsyncGeneratorEnd<T>();
/// \brief Make a generator that always fails with a given error
/// This overload allows inferring the return type from the argument.
template <typename T>
AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) {
return MakeFailingGenerator<T>(result.status());
} // namespace arrow