| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <cassert> |
| #include <deque> |
| #include <queue> |
| |
| #include "arrow/util/functional.h" |
| #include "arrow/util/future.h" |
| #include "arrow/util/iterator.h" |
| #include "arrow/util/mutex.h" |
| #include "arrow/util/optional.h" |
| #include "arrow/util/queue.h" |
| #include "arrow/util/thread_pool.h" |
| |
| namespace arrow { |
| |
| // The methods in this file create, modify, and utilize AsyncGenerator which is an |
| // iterator of futures. This allows an asynchronous source (like file input) to be run |
| // through a pipeline in the same way that iterators can be used to create pipelined |
| // workflows. |
| // |
| // In order to support pipeline parallelism we introduce the concept of asynchronous |
| // reentrancy. This is different than synchronous reentrancy. With synchronous code a |
| // function is reentrant if the function can be called again while a previous call to that |
| // function is still running. Unless otherwise specified none of these generators are |
| // synchronously reentrant. Care should be taken to avoid calling them in such a way (and |
| // the utilities Visit/Collect/Await take care to do this). |
| // |
| // Asynchronous reentrancy on the other hand means the function is called again before the |
| // future returned by the function is marked finished (but after the call to get the |
| // future returns). Some of these generators are async-reentrant while others (e.g. |
| // those that depend on ordered processing like decompression) are not. Read the MakeXYZ |
| // function comments to determine which generators support async reentrancy. |
| // |
| // Note: Generators that are not asynchronously reentrant can still support readahead |
| // (\see MakeSerialReadaheadGenerator). |
| // |
| // Readahead operators, and some other operators, may introduce queueing. Any operators |
| // that introduce buffering should detail the amount of buffering they introduce in their |
| // MakeXYZ function comments. |
| template <typename T> |
| using AsyncGenerator = std::function<Future<T>()>; |
| |
| template <typename T> |
| struct IterationTraits<AsyncGenerator<T>> { |
| /// \brief by default when iterating through a sequence of AsyncGenerator<T>, |
| /// an empty function indicates the end of iteration. |
| static AsyncGenerator<T> End() { return AsyncGenerator<T>(); } |
| |
| static bool IsEnd(const AsyncGenerator<T>& val) { return !val; } |
| }; |
| |
| template <typename T> |
| Future<T> AsyncGeneratorEnd() { |
| return Future<T>::MakeFinished(IterationTraits<T>::End()); |
| } |
| |
| /// returning a future that completes when all have been visited |
| template <typename T> |
| Future<> VisitAsyncGenerator(AsyncGenerator<T> generator, |
| std::function<Status(T)> visitor) { |
| struct LoopBody { |
| struct Callback { |
| Result<ControlFlow<detail::Empty>> operator()(const T& result) { |
| if (IsIterationEnd(result)) { |
| return Break(detail::Empty()); |
| } else { |
| auto visited = visitor(result); |
| if (visited.ok()) { |
| return Continue(); |
| } else { |
| return visited; |
| } |
| } |
| } |
| |
| std::function<Status(T)> visitor; |
| }; |
| |
| Future<ControlFlow<detail::Empty>> operator()() { |
| Callback callback{visitor}; |
| auto next = generator(); |
| return next.Then(std::move(callback)); |
| } |
| |
| AsyncGenerator<T> generator; |
| std::function<Status(T)> visitor; |
| }; |
| |
| return Loop(LoopBody{std::move(generator), std::move(visitor)}); |
| } |
| |
| /// \brief Waits for an async generator to complete, discarding results. |
| template <typename T> |
| Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) { |
| std::function<Status(T)> visitor = [](...) { return Status::OK(); }; |
| return VisitAsyncGenerator(generator, visitor); |
| } |
| |
| /// \brief Collects the results of an async generator into a vector |
| template <typename T> |
| Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) { |
| auto vec = std::make_shared<std::vector<T>>(); |
| struct LoopBody { |
| Future<ControlFlow<std::vector<T>>> operator()() { |
| auto next = generator_(); |
| auto vec = vec_; |
| return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> { |
| if (IsIterationEnd(result)) { |
| return Break(*vec); |
| } else { |
| vec->push_back(result); |
| return Continue(); |
| } |
| }); |
| } |
| AsyncGenerator<T> generator_; |
| std::shared_ptr<std::vector<T>> vec_; |
| }; |
| return Loop(LoopBody{std::move(generator), std::move(vec)}); |
| } |
| |
| /// \see MakeMappedGenerator |
| template <typename T, typename V> |
| class MappingGenerator { |
| public: |
| MappingGenerator(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map) |
| : state_(std::make_shared<State>(std::move(source), std::move(map))) {} |
| |
| Future<V> operator()() { |
| auto future = Future<V>::Make(); |
| bool should_trigger; |
| { |
| auto guard = state_->mutex.Lock(); |
| if (state_->finished) { |
| return AsyncGeneratorEnd<V>(); |
| } |
| should_trigger = state_->waiting_jobs.empty(); |
| state_->waiting_jobs.push_back(future); |
| } |
| if (should_trigger) { |
| state_->source().AddCallback(Callback{state_}); |
| } |
| return future; |
| } |
| |
| private: |
| struct State { |
| State(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map) |
| : source(std::move(source)), |
| map(std::move(map)), |
| waiting_jobs(), |
| mutex(), |
| finished(false) {} |
| |
| void Purge() { |
| // This might be called by an original callback (if the source iterator fails or |
| // ends) or by a mapped callback (if the map function fails or ends prematurely). |
| // Either way it should only be called once and after finished is set so there is no |
| // need to guard access to `waiting_jobs`. |
| while (!waiting_jobs.empty()) { |
| waiting_jobs.front().MarkFinished(IterationTraits<V>::End()); |
| waiting_jobs.pop_front(); |
| } |
| } |
| |
| AsyncGenerator<T> source; |
| std::function<Future<V>(const T&)> map; |
| std::deque<Future<V>> waiting_jobs; |
| util::Mutex mutex; |
| bool finished; |
| }; |
| |
| struct Callback; |
| |
| struct MappedCallback { |
| void operator()(const Result<V>& maybe_next) { |
| bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next); |
| bool should_purge = false; |
| if (end) { |
| { |
| auto guard = state->mutex.Lock(); |
| should_purge = !state->finished; |
| state->finished = true; |
| } |
| } |
| sink.MarkFinished(maybe_next); |
| if (should_purge) { |
| state->Purge(); |
| } |
| } |
| std::shared_ptr<State> state; |
| Future<V> sink; |
| }; |
| |
| struct Callback { |
| void operator()(const Result<T>& maybe_next) { |
| Future<V> sink; |
| bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next); |
| bool should_purge = false; |
| bool should_trigger; |
| { |
| auto guard = state->mutex.Lock(); |
| if (end) { |
| should_purge = !state->finished; |
| state->finished = true; |
| } |
| sink = state->waiting_jobs.front(); |
| state->waiting_jobs.pop_front(); |
| should_trigger = !end && !state->waiting_jobs.empty(); |
| } |
| if (should_purge) { |
| state->Purge(); |
| } |
| if (should_trigger) { |
| state->source().AddCallback(Callback{state}); |
| } |
| if (maybe_next.ok()) { |
| const T& val = maybe_next.ValueUnsafe(); |
| if (IsIterationEnd(val)) { |
| sink.MarkFinished(IterationTraits<V>::End()); |
| } else { |
| Future<V> mapped_fut = state->map(val); |
| mapped_fut.AddCallback(MappedCallback{std::move(state), std::move(sink)}); |
| } |
| } else { |
| sink.MarkFinished(maybe_next.status()); |
| } |
| } |
| |
| std::shared_ptr<State> state; |
| }; |
| |
| std::shared_ptr<State> state_; |
| }; |
| |
| /// \brief Creates a generator that will apply the map function to each element of |
| /// source. The map function is not called on the end token. |
| /// |
| /// Note: This function makes a copy of `map` for each item |
| /// Note: Errors returned from the `map` function will be propagated |
| /// |
| /// If the source generator is async-reentrant then this generator will be also |
| template <typename T, typename V> |
| AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, |
| std::function<Result<V>(const T&)> map) { |
| std::function<Future<V>(const T&)> future_map = [map](const T& val) -> Future<V> { |
| return Future<V>::MakeFinished(map(val)); |
| }; |
| return MappingGenerator<T, V>(std::move(source_generator), std::move(future_map)); |
| } |
| template <typename T, typename V> |
| AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, |
| std::function<V(const T&)> map) { |
| std::function<Future<V>(const T&)> maybe_future_map = [map](const T& val) -> Future<V> { |
| return Future<V>::MakeFinished(map(val)); |
| }; |
| return MappingGenerator<T, V>(std::move(source_generator), std::move(maybe_future_map)); |
| } |
| template <typename T, typename V> |
| AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, |
| std::function<Future<V>(const T&)> map) { |
| return MappingGenerator<T, V>(std::move(source_generator), std::move(map)); |
| } |
| |
| /// \see MakeSequencingGenerator |
| template <typename T, typename ComesAfter, typename IsNext> |
| class SequencingGenerator { |
| public: |
| SequencingGenerator(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next, |
| T initial_value) |
| : state_(std::make_shared<State>(std::move(source), std::move(compare), |
| std::move(is_next), std::move(initial_value))) {} |
| |
| Future<T> operator()() { |
| { |
| auto guard = state_->mutex.Lock(); |
| // We can send a result immediately if the top of the queue is either an |
| // error or the next item |
| if (!state_->queue.empty() && |
| (!state_->queue.top().ok() || |
| state_->is_next(state_->previous_value, *state_->queue.top()))) { |
| auto result = std::move(state_->queue.top()); |
| if (result.ok()) { |
| state_->previous_value = *result; |
| } |
| state_->queue.pop(); |
| return Future<T>::MakeFinished(result); |
| } |
| if (state_->finished) { |
| return AsyncGeneratorEnd<T>(); |
| } |
| // The next item is not in the queue so we will need to wait |
| auto new_waiting_fut = Future<T>::Make(); |
| state_->waiting_future = new_waiting_fut; |
| guard.Unlock(); |
| state_->source().AddCallback(Callback{state_}); |
| return new_waiting_fut; |
| } |
| } |
| |
| private: |
| struct WrappedComesAfter { |
| bool operator()(const Result<T>& left, const Result<T>& right) { |
| if (!left.ok() || !right.ok()) { |
| // Should never happen |
| return false; |
| } |
| return compare(*left, *right); |
| } |
| ComesAfter compare; |
| }; |
| |
| struct State { |
| State(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next, T initial_value) |
| : source(std::move(source)), |
| is_next(std::move(is_next)), |
| previous_value(std::move(initial_value)), |
| waiting_future(), |
| queue(WrappedComesAfter{compare}), |
| finished(false), |
| mutex() {} |
| |
| AsyncGenerator<T> source; |
| IsNext is_next; |
| T previous_value; |
| Future<T> waiting_future; |
| std::priority_queue<Result<T>, std::vector<Result<T>>, WrappedComesAfter> queue; |
| bool finished; |
| util::Mutex mutex; |
| }; |
| |
| class Callback { |
| public: |
| explicit Callback(std::shared_ptr<State> state) : state_(std::move(state)) {} |
| |
| void operator()(const Result<T> result) { |
| Future<T> to_deliver; |
| bool finished; |
| { |
| auto guard = state_->mutex.Lock(); |
| bool ready_to_deliver = false; |
| if (!result.ok()) { |
| // Clear any cached results |
| while (!state_->queue.empty()) { |
| state_->queue.pop(); |
| } |
| ready_to_deliver = true; |
| state_->finished = true; |
| } else if (IsIterationEnd<T>(result.ValueUnsafe())) { |
| ready_to_deliver = state_->queue.empty(); |
| state_->finished = true; |
| } else { |
| ready_to_deliver = state_->is_next(state_->previous_value, *result); |
| } |
| |
| if (ready_to_deliver && state_->waiting_future.is_valid()) { |
| to_deliver = state_->waiting_future; |
| if (result.ok()) { |
| state_->previous_value = *result; |
| } |
| } else { |
| state_->queue.push(result); |
| } |
| // Capture state_->finished so we can access it outside the mutex |
| finished = state_->finished; |
| } |
| // Must deliver result outside of the mutex |
| if (to_deliver.is_valid()) { |
| to_deliver.MarkFinished(result); |
| } else { |
| // Otherwise, if we didn't get the next item (or a terminal item), we |
| // need to keep looking |
| if (!finished) { |
| state_->source().AddCallback(Callback{state_}); |
| } |
| } |
| } |
| |
| private: |
| const std::shared_ptr<State> state_; |
| }; |
| |
| const std::shared_ptr<State> state_; |
| }; |
| |
| /// \brief Buffers an AsyncGenerator to return values in sequence order ComesAfter |
| /// and IsNext determine the sequence order. |
| /// |
| /// ComesAfter should be a BinaryPredicate that only returns true if a comes after b |
| /// |
| /// IsNext should be a BinaryPredicate that returns true, given `a` and `b`, only if |
| /// `b` follows immediately after `a`. It should return true given `initial_value` and |
| /// `b` if `b` is the first item in the sequence. |
| /// |
| /// This operator will queue unboundedly while waiting for the next item. It is intended |
| /// for jittery sources that might scatter an ordered sequence. It is NOT intended to |
| /// sort. Using it to try and sort could result in excessive RAM usage. This generator |
| /// will queue up to N blocks where N is the max "out of order"ness of the source. |
| /// |
| /// For example, if the source is 1,6,2,5,4,3 it will queue 3 blocks because 3 is 3 |
| /// blocks beyond where it belongs. |
| /// |
| /// This generator is not async-reentrant but it consists only of a simple log(n) |
| /// insertion into a priority queue. |
| template <typename T, typename ComesAfter, typename IsNext> |
| AsyncGenerator<T> MakeSequencingGenerator(AsyncGenerator<T> source_generator, |
| ComesAfter compare, IsNext is_next, |
| T initial_value) { |
| return SequencingGenerator<T, ComesAfter, IsNext>( |
| std::move(source_generator), std::move(compare), std::move(is_next), |
| std::move(initial_value)); |
| } |
| |
| /// \see MakeTransformedGenerator |
| template <typename T, typename V> |
| class TransformingGenerator { |
| // The transforming generator state will be referenced as an async generator but will |
| // also be referenced via callback to various futures. If the async generator owner |
| // moves it around we need the state to be consistent for future callbacks. |
| struct TransformingGeneratorState |
| : std::enable_shared_from_this<TransformingGeneratorState> { |
| TransformingGeneratorState(AsyncGenerator<T> generator, Transformer<T, V> transformer) |
| : generator_(std::move(generator)), |
| transformer_(std::move(transformer)), |
| last_value_(), |
| finished_() {} |
| |
| Future<V> operator()() { |
| while (true) { |
| auto maybe_next_result = Pump(); |
| if (!maybe_next_result.ok()) { |
| return Future<V>::MakeFinished(maybe_next_result.status()); |
| } |
| auto maybe_next = std::move(maybe_next_result).ValueUnsafe(); |
| if (maybe_next.has_value()) { |
| return Future<V>::MakeFinished(*std::move(maybe_next)); |
| } |
| |
| auto next_fut = generator_(); |
| // If finished already, process results immediately inside the loop to avoid |
| // stack overflow |
| if (next_fut.is_finished()) { |
| auto next_result = next_fut.result(); |
| if (next_result.ok()) { |
| last_value_ = *next_result; |
| } else { |
| return Future<V>::MakeFinished(next_result.status()); |
| } |
| // Otherwise, if not finished immediately, add callback to process results |
| } else { |
| auto self = this->shared_from_this(); |
| return next_fut.Then([self](const Result<T>& next_result) { |
| if (next_result.ok()) { |
| self->last_value_ = *next_result; |
| return (*self)(); |
| } else { |
| return Future<V>::MakeFinished(next_result.status()); |
| } |
| }); |
| } |
| } |
| } |
| |
| // See comment on TransformingIterator::Pump |
| Result<util::optional<V>> Pump() { |
| if (!finished_ && last_value_.has_value()) { |
| ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_)); |
| if (next.ReadyForNext()) { |
| if (IsIterationEnd(*last_value_)) { |
| finished_ = true; |
| } |
| last_value_.reset(); |
| } |
| if (next.Finished()) { |
| finished_ = true; |
| } |
| if (next.HasValue()) { |
| return next.Value(); |
| } |
| } |
| if (finished_) { |
| return IterationTraits<V>::End(); |
| } |
| return util::nullopt; |
| } |
| |
| AsyncGenerator<T> generator_; |
| Transformer<T, V> transformer_; |
| util::optional<T> last_value_; |
| bool finished_; |
| }; |
| |
| public: |
| explicit TransformingGenerator(AsyncGenerator<T> generator, |
| Transformer<T, V> transformer) |
| : state_(std::make_shared<TransformingGeneratorState>(std::move(generator), |
| std::move(transformer))) {} |
| |
| Future<V> operator()() { return (*state_)(); } |
| |
| protected: |
| std::shared_ptr<TransformingGeneratorState> state_; |
| }; |
| |
| /// \brief Transforms an async generator using a transformer function returning a new |
| /// AsyncGenerator |
| /// |
| /// The transform function here behaves exactly the same as the transform function in |
| /// MakeTransformedIterator and you can safely use the same transform function to |
| /// transform both synchronous and asynchronous streams. |
| /// |
| /// This generator is not async-reentrant |
| /// |
| /// This generator may queue up to 1 instance of T |
| template <typename T, typename V> |
| AsyncGenerator<V> MakeTransformedGenerator(AsyncGenerator<T> generator, |
| Transformer<T, V> transformer) { |
| return TransformingGenerator<T, V>(generator, transformer); |
| } |
| |
| /// \see MakeSerialReadaheadGenerator |
| template <typename T> |
| class SerialReadaheadGenerator { |
| public: |
| SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) |
| : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} |
| |
| Future<T> operator()() { |
| if (state_->first_) { |
| // Lazy generator, need to wait for the first ask to prime the pump |
| state_->first_ = false; |
| auto next = state_->source_(); |
| return next.Then(Callback{state_}); |
| } |
| |
| // This generator is not async-reentrant. We won't be called until the last |
| // future finished so we know there is something in the queue |
| auto finished = state_->finished_.load(); |
| if (finished && state_->readahead_queue_.IsEmpty()) { |
| return AsyncGeneratorEnd<T>(); |
| } |
| |
| std::shared_ptr<Future<T>> next; |
| if (!state_->readahead_queue_.Read(next)) { |
| return Status::UnknownError("Could not read from readahead_queue"); |
| } |
| |
| auto last_available = state_->spaces_available_.fetch_add(1); |
| if (last_available == 0 && !finished) { |
| // Reader idled out, we need to restart it |
| ARROW_RETURN_NOT_OK(state_->Pump(state_)); |
| } |
| return *next; |
| } |
| |
| private: |
| struct State { |
| State(AsyncGenerator<T> source, int max_readahead) |
| : first_(true), |
| source_(std::move(source)), |
| finished_(false), |
| // There is one extra "space" for the in-flight request |
| spaces_available_(max_readahead + 1), |
| // The SPSC queue has size-1 "usable" slots so we need to overallocate 1 |
| readahead_queue_(max_readahead + 1) {} |
| |
| Status Pump(const std::shared_ptr<State>& self) { |
| // Can't do readahead_queue.write(source().Then(Callback{self})) because then the |
| // callback might run immediately and add itself to the queue before this gets added |
| // to the queue messing up the order. |
| auto next_slot = std::make_shared<Future<T>>(); |
| auto written = readahead_queue_.Write(next_slot); |
| if (!written) { |
| return Status::UnknownError("Could not write to readahead_queue"); |
| } |
| // If this Pump is being called from a callback it is possible for the source to |
| // poll and read from the queue between the Write and this spot where we fill the |
| // value in. However, it is not possible for the future to read this value we are |
| // writing. That is because this callback (the callback for future X) must be |
| // finished before future X is marked complete and this source is not pulled |
| // reentrantly so it will not poll for future X+1 until this callback has completed. |
| *next_slot = source_().Then(Callback{self}); |
| return Status::OK(); |
| } |
| |
| // Only accessed by the consumer end |
| bool first_; |
| // Accessed by both threads |
| AsyncGenerator<T> source_; |
| std::atomic<bool> finished_; |
| // The queue has a size but it is not atomic. We keep track of how many spaces are |
| // left in the queue here so we know if we've just written the last value and we need |
| // to stop reading ahead or if we've just read from a full queue and we need to |
| // restart reading ahead |
| std::atomic<uint32_t> spaces_available_; |
| // Needs to be a queue of shared_ptr and not Future because we set the value of the |
| // future after we add it to the queue |
| util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue_; |
| }; |
| |
| struct Callback { |
| Result<T> operator()(const Result<T>& maybe_next) { |
| if (!maybe_next.ok()) { |
| state_->finished_.store(true); |
| return maybe_next; |
| } |
| const auto& next = *maybe_next; |
| if (IsIterationEnd(next)) { |
| state_->finished_.store(true); |
| return maybe_next; |
| } |
| auto last_available = state_->spaces_available_.fetch_sub(1); |
| if (last_available > 1) { |
| ARROW_RETURN_NOT_OK(state_->Pump(state_)); |
| } |
| return maybe_next; |
| } |
| |
| std::shared_ptr<State> state_; |
| }; |
| |
| std::shared_ptr<State> state_; |
| }; |
| |
| /// \see MakeFromFuture |
| template <typename T> |
| class FutureFirstGenerator { |
| public: |
| explicit FutureFirstGenerator(Future<AsyncGenerator<T>> future) |
| : state_(std::make_shared<State>(std::move(future))) {} |
| |
| Future<T> operator()() { |
| if (state_->source_) { |
| return state_->source_(); |
| } else { |
| auto state = state_; |
| return state_->future_.Then([state](const AsyncGenerator<T>& source) { |
| state->source_ = source; |
| return state->source_(); |
| }); |
| } |
| } |
| |
| private: |
| struct State { |
| explicit State(Future<AsyncGenerator<T>> future) : future_(future), source_() {} |
| |
| Future<AsyncGenerator<T>> future_; |
| AsyncGenerator<T> source_; |
| }; |
| |
| std::shared_ptr<State> state_; |
| }; |
| |
| /// \brief Transforms a Future<AsyncGenerator<T>> into an AsyncGenerator<T> |
| /// that waits for the future to complete as part of the first item. |
| /// |
| /// This generator is not async-reentrant (even if the generator yielded by future is) |
| /// |
| /// This generator does not queue |
| template <typename T> |
| AsyncGenerator<T> MakeFromFuture(Future<AsyncGenerator<T>> future) { |
| return FutureFirstGenerator<T>(std::move(future)); |
| } |
| |
| /// \brief Creates a generator that will pull from the source into a queue. Unlike |
| /// MakeReadaheadGenerator this will not pull reentrantly from the source. |
| /// |
| /// The source generator does not need to be async-reentrant |
| /// |
| /// This generator is not async-reentrant (even if the source is) |
| /// |
| /// This generator may queue up to max_readahead additional instances of T |
| template <typename T> |
| AsyncGenerator<T> MakeSerialReadaheadGenerator(AsyncGenerator<T> source_generator, |
| int max_readahead) { |
| return SerialReadaheadGenerator<T>(std::move(source_generator), max_readahead); |
| } |
| |
| /// \see MakeReadaheadGenerator |
| template <typename T> |
| class ReadaheadGenerator { |
| public: |
| ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) |
| : source_generator_(std::move(source_generator)), max_readahead_(max_readahead) { |
| auto finished = std::make_shared<std::atomic<bool>>(false); |
| mark_finished_if_done_ = [finished](const Result<T>& next_result) { |
| if (!next_result.ok()) { |
| finished->store(true); |
| } else { |
| if (IsIterationEnd(*next_result)) { |
| *finished = true; |
| } |
| } |
| }; |
| finished_ = std::move(finished); |
| } |
| |
| Future<T> operator()() { |
| if (readahead_queue_.empty()) { |
| // This is the first request, let's pump the underlying queue |
| for (int i = 0; i < max_readahead_; i++) { |
| auto next = source_generator_(); |
| next.AddCallback(mark_finished_if_done_); |
| readahead_queue_.push(std::move(next)); |
| } |
| } |
| // Pop one and add one |
| auto result = readahead_queue_.front(); |
| readahead_queue_.pop(); |
| if (finished_->load()) { |
| readahead_queue_.push(AsyncGeneratorEnd<T>()); |
| } else { |
| auto back_of_queue = source_generator_(); |
| back_of_queue.AddCallback(mark_finished_if_done_); |
| readahead_queue_.push(std::move(back_of_queue)); |
| } |
| return result; |
| } |
| |
| private: |
| AsyncGenerator<T> source_generator_; |
| int max_readahead_; |
| std::function<void(const Result<T>&)> mark_finished_if_done_; |
| // Can't use a bool here because finished may be referenced by callbacks that |
| // outlive this class |
| std::shared_ptr<std::atomic<bool>> finished_; |
| std::queue<Future<T>> readahead_queue_; |
| }; |
| |
| /// \brief A generator where the producer pushes items on a queue. |
| /// |
| /// No back-pressure is applied, so this generator is mostly useful when |
| /// producing the values is neither CPU- nor memory-expensive (e.g. fetching |
| /// filesystem metadata). |
| /// |
| /// This generator is not async-reentrant. |
| template <typename T> |
| class PushGenerator { |
| struct State { |
| util::Mutex mutex; |
| std::deque<Result<T>> result_q; |
| util::optional<Future<T>> consumer_fut; |
| bool finished = false; |
| }; |
| |
| public: |
| /// Producer API for PushGenerator |
| class Producer { |
| public: |
| explicit Producer(std::shared_ptr<State> state) : state_(std::move(state)) {} |
| |
| /// Push a value on the queue |
| void Push(Result<T> result) { |
| auto lock = state_->mutex.Lock(); |
| if (state_->finished) { |
| // Closed early |
| return; |
| } |
| if (state_->consumer_fut.has_value()) { |
| auto fut = std::move(state_->consumer_fut.value()); |
| state_->consumer_fut.reset(); |
| lock.Unlock(); // unlock before potentially invoking a callback |
| fut.MarkFinished(std::move(result)); |
| return; |
| } |
| state_->result_q.push_back(std::move(result)); |
| } |
| |
| /// \brief Tell the consumer we have finished producing |
| /// |
| /// It is allowed to call this and later call Push() again ("early close"). |
| /// In this case, calls to Push() after the queue is closed are silently |
| /// ignored. This can help implementing non-trivial cancellation cases. |
| void Close() { |
| auto lock = state_->mutex.Lock(); |
| if (state_->finished) { |
| // Already closed |
| return; |
| } |
| state_->finished = true; |
| if (state_->consumer_fut.has_value()) { |
| auto fut = std::move(state_->consumer_fut.value()); |
| state_->consumer_fut.reset(); |
| lock.Unlock(); // unlock before potentially invoking a callback |
| fut.MarkFinished(IterationTraits<T>::End()); |
| } |
| } |
| |
| bool is_closed() const { |
| auto lock = state_->mutex.Lock(); |
| return state_->finished; |
| } |
| |
| private: |
| const std::shared_ptr<State> state_; |
| }; |
| |
| PushGenerator() : state_(std::make_shared<State>()) {} |
| |
| /// Read an item from the queue |
| Future<T> operator()() { |
| auto lock = state_->mutex.Lock(); |
| assert(!state_->consumer_fut.has_value()); // Non-reentrant |
| if (!state_->result_q.empty()) { |
| auto fut = Future<T>::MakeFinished(std::move(state_->result_q.front())); |
| state_->result_q.pop_front(); |
| return fut; |
| } |
| if (state_->finished) { |
| return AsyncGeneratorEnd<T>(); |
| } |
| auto fut = Future<T>::Make(); |
| state_->consumer_fut = fut; |
| return fut; |
| } |
| |
| /// \brief Return producer-side interface |
| /// |
| /// The returned object must be used by the producer to push values on the queue. |
| /// Only a single Producer object should be instantiated. |
| Producer producer() { return Producer{state_}; } |
| |
| private: |
| const std::shared_ptr<State> state_; |
| }; |
| |
| /// \brief Creates a generator that pulls reentrantly from a source |
| /// This generator will pull reentrantly from a source, ensuring that max_readahead |
| /// requests are active at any given time. |
| /// |
| /// The source generator must be async-reentrant |
| /// |
| /// This generator itself is async-reentrant. |
| /// |
| /// This generator may queue up to max_readahead instances of T |
| template <typename T> |
| AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator, |
| int max_readahead) { |
| return ReadaheadGenerator<T>(std::move(source_generator), max_readahead); |
| } |
| |
| /// \brief Creates a generator that will yield finished futures from a vector |
| /// |
| /// This generator is async-reentrant |
| template <typename T> |
| AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) { |
| struct State { |
| explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {} |
| |
| std::vector<T> vec; |
| std::atomic<std::size_t> vec_idx; |
| }; |
| |
| auto state = std::make_shared<State>(std::move(vec)); |
| return [state]() { |
| auto idx = state->vec_idx.fetch_add(1); |
| if (idx >= state->vec.size()) { |
| return AsyncGeneratorEnd<T>(); |
| } |
| return Future<T>::MakeFinished(state->vec[idx]); |
| }; |
| } |
| |
| /// \see MakeMergedGenerator |
| template <typename T> |
| class MergedGenerator { |
| public: |
| explicit MergedGenerator(AsyncGenerator<AsyncGenerator<T>> source, |
| int max_subscriptions) |
| : state_(std::make_shared<State>(std::move(source), max_subscriptions)) {} |
| |
| Future<T> operator()() { |
| Future<T> waiting_future; |
| std::shared_ptr<DeliveredJob> delivered_job; |
| { |
| auto guard = state_->mutex.Lock(); |
| if (!state_->delivered_jobs.empty()) { |
| delivered_job = std::move(state_->delivered_jobs.front()); |
| state_->delivered_jobs.pop_front(); |
| } else if (state_->finished) { |
| return IterationTraits<T>::End(); |
| } else { |
| waiting_future = Future<T>::Make(); |
| state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future)); |
| } |
| } |
| if (delivered_job) { |
| // deliverer will be invalid if outer callback encounters an error and delivers a |
| // failed result |
| if (delivered_job->deliverer) { |
| delivered_job->deliverer().AddCallback( |
| InnerCallback{state_, delivered_job->index}); |
| } |
| return std::move(delivered_job->value); |
| } |
| if (state_->first) { |
| state_->first = false; |
| for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) { |
| state_->source().AddCallback(OuterCallback{state_, i}); |
| } |
| } |
| return waiting_future; |
| } |
| |
| private: |
| struct DeliveredJob { |
| explicit DeliveredJob(AsyncGenerator<T> deliverer_, Result<T> value_, |
| std::size_t index_) |
| : deliverer(deliverer_), value(std::move(value_)), index(index_) {} |
| |
| AsyncGenerator<T> deliverer; |
| Result<T> value; |
| std::size_t index; |
| }; |
| |
| struct State { |
| State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions) |
| : source(std::move(source)), |
| active_subscriptions(max_subscriptions), |
| delivered_jobs(), |
| waiting_jobs(), |
| mutex(), |
| first(true), |
| source_exhausted(false), |
| finished(false), |
| num_active_subscriptions(max_subscriptions) {} |
| |
| AsyncGenerator<AsyncGenerator<T>> source; |
| // active_subscriptions and delivered_jobs will be bounded by max_subscriptions |
| std::vector<AsyncGenerator<T>> active_subscriptions; |
| std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs; |
| // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will provide the |
| // backpressure |
| std::deque<std::shared_ptr<Future<T>>> waiting_jobs; |
| util::Mutex mutex; |
| bool first; |
| bool source_exhausted; |
| bool finished; |
| int num_active_subscriptions; |
| }; |
| |
| struct InnerCallback { |
| void operator()(const Result<T>& maybe_next) { |
| Future<T> sink; |
| bool sub_finished = maybe_next.ok() && IsIterationEnd(*maybe_next); |
| { |
| auto guard = state->mutex.Lock(); |
| if (state->finished) { |
| // We've errored out so just ignore this result and don't keep pumping |
| return; |
| } |
| if (!sub_finished) { |
| if (state->waiting_jobs.empty()) { |
| state->delivered_jobs.push_back(std::make_shared<DeliveredJob>( |
| state->active_subscriptions[index], maybe_next, index)); |
| } else { |
| sink = std::move(*state->waiting_jobs.front()); |
| state->waiting_jobs.pop_front(); |
| } |
| } |
| } |
| if (sub_finished) { |
| state->source().AddCallback(OuterCallback{state, index}); |
| } else if (sink.is_valid()) { |
| sink.MarkFinished(maybe_next); |
| if (maybe_next.ok()) { |
| state->active_subscriptions[index]().AddCallback(*this); |
| } |
| } |
| } |
| std::shared_ptr<State> state; |
| std::size_t index; |
| }; |
| |
| struct OuterCallback { |
| void operator()(const Result<AsyncGenerator<T>>& maybe_next) { |
| bool should_purge = false; |
| bool should_continue = false; |
| Future<T> error_sink; |
| { |
| auto guard = state->mutex.Lock(); |
| if (!maybe_next.ok() || IsIterationEnd(*maybe_next)) { |
| state->source_exhausted = true; |
| if (!maybe_next.ok() || --state->num_active_subscriptions == 0) { |
| state->finished = true; |
| should_purge = true; |
| } |
| if (!maybe_next.ok()) { |
| if (state->waiting_jobs.empty()) { |
| state->delivered_jobs.push_back(std::make_shared<DeliveredJob>( |
| AsyncGenerator<T>(), maybe_next.status(), index)); |
| } else { |
| error_sink = std::move(*state->waiting_jobs.front()); |
| state->waiting_jobs.pop_front(); |
| } |
| } |
| } else { |
| state->active_subscriptions[index] = *maybe_next; |
| should_continue = true; |
| } |
| } |
| if (error_sink.is_valid()) { |
| error_sink.MarkFinished(maybe_next.status()); |
| } |
| if (should_continue) { |
| (*maybe_next)().AddCallback(InnerCallback{state, index}); |
| } else if (should_purge) { |
| // At this point state->finished has been marked true so no one else |
| // will be interacting with waiting_jobs and we can iterate outside lock |
| while (!state->waiting_jobs.empty()) { |
| state->waiting_jobs.front()->MarkFinished(IterationTraits<T>::End()); |
| state->waiting_jobs.pop_front(); |
| } |
| } |
| } |
| std::shared_ptr<State> state; |
| std::size_t index; |
| }; |
| |
| std::shared_ptr<State> state_; |
| }; |
| |
| /// \brief Creates a generator that takes in a stream of generators and pulls from up to |
| /// max_subscriptions at a time |
| /// |
| /// Note: This may deliver items out of sequence. For example, items from the third |
| /// AsyncGenerator generated by the source may be emitted before some items from the first |
| /// AsyncGenerator generated by the source. |
| /// |
| /// This generator will pull from source async-reentrantly unless max_subscriptions is 1 |
| /// This generator will not pull from the individual subscriptions reentrantly. Add |
| /// readahead to the individual subscriptions if that is desired. |
| /// This generator is async-reentrant |
| /// |
| /// This generator may queue up to max_subscriptions instances of T |
| template <typename T> |
| AsyncGenerator<T> MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source, |
| int max_subscriptions) { |
| return MergedGenerator<T>(std::move(source), max_subscriptions); |
| } |
| |
| /// \brief Creates a generator that takes in a stream of generators and pulls from each |
| /// one in sequence. |
| /// |
| /// This generator is async-reentrant but will never pull from source reentrantly and |
| /// will never pull from any subscription reentrantly. |
| /// |
| /// This generator may queue 1 instance of T |
| template <typename T> |
| AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> source) { |
| return MergedGenerator<T>(std::move(source), 1); |
| } |
| |
| /// \see MakeTransferredGenerator |
| template <typename T> |
| class TransferringGenerator { |
| public: |
| explicit TransferringGenerator(AsyncGenerator<T> source, internal::Executor* executor) |
| : source_(std::move(source)), executor_(executor) {} |
| |
| Future<T> operator()() { return executor_->Transfer(source_()); } |
| |
| private: |
| AsyncGenerator<T> source_; |
| internal::Executor* executor_; |
| }; |
| |
| /// \brief Transfers a future to an underlying executor. |
| /// |
| /// Continuations run on the returned future will be run on the given executor |
| /// if they cannot be run synchronously. |
| /// |
| /// This is often needed to move computation off I/O threads or other external |
| /// completion sources and back on to the CPU executor so the I/O thread can |
| /// stay busy and focused on I/O |
| /// |
| /// Keep in mind that continuations called on an already completed future will |
| /// always be run synchronously and so no transfer will happen in that case. |
| /// |
| /// This generator is async reentrant if the source is |
| /// |
| /// This generator will not queue |
| template <typename T> |
| AsyncGenerator<T> MakeTransferredGenerator(AsyncGenerator<T> source, |
| internal::Executor* executor) { |
| return TransferringGenerator<T>(std::move(source), executor); |
| } |
| /// \see MakeIteratorGenerator |
| template <typename T> |
| class IteratorGenerator { |
| public: |
| explicit IteratorGenerator(Iterator<T> it) : it_(std::move(it)) {} |
| |
| Future<T> operator()() { return Future<T>::MakeFinished(it_.Next()); } |
| |
| private: |
| Iterator<T> it_; |
| }; |
| |
| /// \brief Constructs a generator that yields futures from an iterator. |
| /// |
| /// Note: Do not use this if you can avoid it. This blocks in an async |
| /// context which is a bad idea. If you're converting sync-I/O to async |
| /// then use MakeBackgroundGenerator. Otherwise, convert the underlying |
| /// source to async. This function is only around until we can conver the |
| /// remaining table readers to async. Once all uses of this generator have |
| /// been removed it should be removed(ARROW-11909). |
| /// |
| /// This generator is not async-reentrant |
| /// |
| /// This generator will not queue |
| template <typename T> |
| AsyncGenerator<T> MakeIteratorGenerator(Iterator<T> it) { |
| return IteratorGenerator<T>(std::move(it)); |
| } |
| |
| /// \see MakeBackgroundGenerator |
| template <typename T> |
| class BackgroundGenerator { |
| public: |
| explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor, int max_q, |
| int q_restart) |
| : state_(std::make_shared<State>(io_executor, std::move(it), max_q, q_restart)) {} |
| |
| ~BackgroundGenerator() {} |
| |
| Future<T> operator()() { |
| auto guard = state_->mutex.Lock(); |
| Future<T> waiting_future; |
| if (state_->queue.empty()) { |
| if (state_->finished) { |
| return AsyncGeneratorEnd<T>(); |
| } else { |
| waiting_future = Future<T>::Make(); |
| state_->waiting_future = waiting_future; |
| } |
| } else { |
| auto next = Future<T>::MakeFinished(std::move(state_->queue.front())); |
| state_->queue.pop(); |
| if (!state_->running && |
| static_cast<int>(state_->queue.size()) <= state_->q_restart) { |
| state_->RestartTask(state_, std::move(guard)); |
| } |
| return next; |
| } |
| if (!state_->running) { |
| // This branch should only be needed to start the background thread on the first |
| // call |
| state_->RestartTask(state_, std::move(guard)); |
| } |
| return waiting_future; |
| } |
| |
| protected: |
| struct State { |
| State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart) |
| : io_executor(io_executor), |
| it(std::move(it)), |
| running(false), |
| finished(false), |
| max_q(max_q), |
| q_restart(q_restart) {} |
| |
| void ClearQueue() { |
| while (!queue.empty()) { |
| queue.pop(); |
| } |
| } |
| |
| void RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) { |
| if (!finished) { |
| running = true; |
| auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); |
| if (!spawn_status.ok()) { |
| running = false; |
| finished = true; |
| if (waiting_future.has_value()) { |
| auto to_deliver = std::move(waiting_future.value()); |
| waiting_future.reset(); |
| guard.Unlock(); |
| to_deliver.MarkFinished(spawn_status); |
| } else { |
| ClearQueue(); |
| queue.push(spawn_status); |
| } |
| } |
| } |
| } |
| |
| internal::Executor* io_executor; |
| Iterator<T> it; |
| bool running; |
| bool finished; |
| int max_q; |
| int q_restart; |
| std::queue<Result<T>> queue; |
| util::optional<Future<T>> waiting_future; |
| util::Mutex mutex; |
| }; |
| |
| class Task { |
| public: |
| void operator()(std::shared_ptr<State> state) { |
| // while condition can't be based on state_ because it is run outside the mutex |
| bool running = true; |
| while (running) { |
| auto next = state->it.Next(); |
| // Need to capture state->waiting_future inside the mutex to mark finished outside |
| Future<T> waiting_future; |
| { |
| auto guard = state->mutex.Lock(); |
| |
| if (!next.ok() || IsIterationEnd<T>(*next)) { |
| state->finished = true; |
| state->running = false; |
| if (!next.ok()) { |
| state->ClearQueue(); |
| } |
| } |
| if (state->waiting_future.has_value()) { |
| waiting_future = std::move(state->waiting_future.value()); |
| state->waiting_future.reset(); |
| } else { |
| state->queue.push(std::move(next)); |
| if (static_cast<int>(state->queue.size()) >= state->max_q) { |
| state->running = false; |
| } |
| } |
| running = state->running; |
| } |
| // This must happen outside the task. Although presumably there is a transferring |
| // generator on the other end that will quickly transfer any callbacks off of this |
| // thread so we can continue looping. Still, best not to rely on that |
| if (waiting_future.is_valid()) { |
| waiting_future.MarkFinished(next); |
| } |
| } |
| } |
| }; |
| |
| std::shared_ptr<State> state_; |
| }; |
| |
| constexpr int kDefaultBackgroundMaxQ = 32; |
| constexpr int kDefaultBackgroundQRestart = 16; |
| |
| /// \brief Creates an AsyncGenerator<T> by iterating over an Iterator<T> on a background |
| /// thread |
| /// |
| /// The parameter max_q and q_restart control queue size and background thread task |
| /// management. If the background task is fast you typically don't want it creating a |
| /// thread task for every item. Instead the background thread will run until it fills |
| /// up a readahead queue. |
| /// |
| /// Once the queue has filled up the background thread task will terminate (allowing other |
| /// I/O tasks to use the thread). Once the queue has been drained enough (specified by |
| /// q_restart) then the background thread task will be restarted. If q_restart is too low |
| /// then you may exhaust the queue waiting for the background thread task to start running |
| /// again. If it is too high then it will be constantly stopping and restarting the |
| /// background queue task |
| /// |
| /// This generator is not async-reentrant |
| /// |
| /// This generator will queue up to max_q blocks |
| template <typename T> |
| static Result<AsyncGenerator<T>> MakeBackgroundGenerator( |
| Iterator<T> iterator, internal::Executor* io_executor, |
| int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) { |
| if (max_q < q_restart) { |
| return Status::Invalid("max_q must be >= q_restart"); |
| } |
| return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q, q_restart); |
| } |
| |
| /// \see MakeGeneratorIterator |
| template <typename T> |
| class GeneratorIterator { |
| public: |
| explicit GeneratorIterator(AsyncGenerator<T> source) : source_(std::move(source)) {} |
| |
| Result<T> Next() { return source_().result(); } |
| |
| private: |
| AsyncGenerator<T> source_; |
| }; |
| |
| /// \brief Converts an AsyncGenerator<T> to an Iterator<T> by blocking until each future |
| /// is finished |
| template <typename T> |
| Result<Iterator<T>> MakeGeneratorIterator(AsyncGenerator<T> source) { |
| return Iterator<T>(GeneratorIterator<T>(std::move(source))); |
| } |
| |
| /// \brief Adds readahead to an iterator using a background thread. |
| /// |
| /// Under the hood this is converting the iterator to a generator using |
| /// MakeBackgroundGenerator, adding readahead to the converted generator with |
| /// MakeReadaheadGenerator, and then converting back to an iterator using |
| /// MakeGeneratorIterator. |
| template <typename T> |
| Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> it, int readahead_queue_size) { |
| ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1)); |
| auto max_q = readahead_queue_size; |
| auto q_restart = std::max(1, max_q / 2); |
| ARROW_ASSIGN_OR_RAISE( |
| auto background_generator, |
| MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart)); |
| // Capture io_executor to keep it alive as long as owned_bg_generator is still |
| // referenced |
| AsyncGenerator<T> owned_bg_generator = [io_executor, background_generator]() { |
| return background_generator(); |
| }; |
| return MakeGeneratorIterator(std::move(owned_bg_generator)); |
| } |
| |
| /// \brief Make a generator that returns a single pre-generated future |
| /// |
| /// This generator is async-reentrant. |
| template <typename T> |
| std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) { |
| assert(future.is_valid()); |
| auto state = std::make_shared<Future<T>>(std::move(future)); |
| return [state]() -> Future<T> { |
| auto fut = std::move(*state); |
| if (fut.is_valid()) { |
| return fut; |
| } else { |
| return AsyncGeneratorEnd<T>(); |
| } |
| }; |
| } |
| |
| /// \brief Make a generator that always fails with a given error |
| /// |
| /// This generator is async-reentrant. |
| template <typename T> |
| AsyncGenerator<T> MakeFailingGenerator(Status st) { |
| assert(!st.ok()); |
| auto state = std::make_shared<Status>(std::move(st)); |
| return [state]() -> Future<T> { |
| auto st = std::move(*state); |
| if (!st.ok()) { |
| return std::move(st); |
| } else { |
| return AsyncGeneratorEnd<T>(); |
| } |
| }; |
| } |
| |
| /// \brief Make a generator that always fails with a given error |
| /// |
| /// This overload allows inferring the return type from the argument. |
| template <typename T> |
| AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) { |
| return MakeFailingGenerator<T>(result.status()); |
| } |
| |
| } // namespace arrow |