blob: 36d06297049b9403e22be3e1b188016992f5343d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <random>
#include <thread>
#include <unordered_set>
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/test_common.h"
#include "arrow/util/vector.h"
namespace arrow {
template <typename T>
AsyncGenerator<T> AsyncVectorIt(std::vector<T> v) {
return MakeVectorGenerator(std::move(v));
}
template <typename T>
AsyncGenerator<T> FailsAt(AsyncGenerator<T> src, int failing_index) {
auto index = std::make_shared<std::atomic<int>>(0);
return [src, index, failing_index]() {
auto idx = index->fetch_add(1);
if (idx >= failing_index) {
return Future<T>::MakeFinished(Status::Invalid("XYZ"));
}
return src();
};
}
template <typename T>
AsyncGenerator<T> SlowdownABit(AsyncGenerator<T> source) {
return MakeMappedGenerator<T, T>(std::move(source), [](const T& res) -> Future<T> {
return SleepABitAsync().Then(
[res](const Result<detail::Empty>& empty) { return res; });
});
}
template <typename T>
class TrackingGenerator {
public:
explicit TrackingGenerator(AsyncGenerator<T> source)
: state_(std::make_shared<State>(std::move(source))) {}
Future<T> operator()() {
state_->num_read++;
return state_->source();
}
int num_read() { return state_->num_read; }
private:
struct State {
explicit State(AsyncGenerator<T> source) : source(std::move(source)), num_read(0) {}
AsyncGenerator<T> source;
int num_read;
};
std::shared_ptr<State> state_;
};
// Yields items with a small pause between each one from a background thread
std::function<Future<TestInt>()> BackgroundAsyncVectorIt(
std::vector<TestInt> v, bool sleep = true, int max_q = kDefaultBackgroundMaxQ,
int q_restart = kDefaultBackgroundQRestart) {
auto pool = internal::GetCpuThreadPool();
auto slow_iterator = PossiblySlowVectorIt(v, sleep);
EXPECT_OK_AND_ASSIGN(
auto background,
MakeBackgroundGenerator<TestInt>(std::move(slow_iterator),
internal::GetCpuThreadPool(), max_q, q_restart));
return MakeTransferredGenerator(background, pool);
}
std::function<Future<TestInt>()> NewBackgroundAsyncVectorIt(std::vector<TestInt> v,
bool sleep = true) {
auto pool = internal::GetCpuThreadPool();
auto iterator = VectorIt(v);
auto slow_iterator = MakeTransformedIterator<TestInt, TestInt>(
std::move(iterator), [sleep](TestInt item) -> Result<TransformFlow<TestInt>> {
if (sleep) {
SleepABit();
}
return TransformYield(item);
});
EXPECT_OK_AND_ASSIGN(auto background,
MakeBackgroundGenerator<TestInt>(std::move(slow_iterator),
internal::GetCpuThreadPool()));
return MakeTransferredGenerator(background, pool);
}
template <typename T>
void AssertAsyncGeneratorMatch(std::vector<T> expected, AsyncGenerator<T> actual) {
auto vec_future = CollectAsyncGenerator(std::move(actual));
EXPECT_OK_AND_ASSIGN(auto vec, vec_future.result());
EXPECT_EQ(expected, vec);
}
template <typename T>
void AssertGeneratorExhausted(AsyncGenerator<T>& gen) {
ASSERT_FINISHES_OK_AND_ASSIGN(auto next, gen());
ASSERT_TRUE(IsIterationEnd(next));
}
// --------------------------------------------------------------------
// Asynchronous iterator tests
template <typename T>
class ReentrantCheckerGuard;
template <typename T>
ReentrantCheckerGuard<T> ExpectNotAccessedReentrantly(AsyncGenerator<T>* generator);
template <typename T>
class ReentrantChecker {
public:
Future<T> operator()() {
if (state_->generated_unfinished_future.load()) {
state_->valid.store(false);
}
state_->generated_unfinished_future.store(true);
auto result = state_->source();
return result.Then(Callback{state_});
}
bool valid() { return state_->valid.load(); }
private:
explicit ReentrantChecker(AsyncGenerator<T> source)
: state_(std::make_shared<State>(std::move(source))) {}
friend ReentrantCheckerGuard<T> ExpectNotAccessedReentrantly<T>(
AsyncGenerator<T>* generator);
struct State {
explicit State(AsyncGenerator<T> source_)
: source(std::move(source_)), generated_unfinished_future(false), valid(true) {}
AsyncGenerator<T> source;
std::atomic<bool> generated_unfinished_future;
std::atomic<bool> valid;
};
struct Callback {
Future<T> operator()(const Result<T>& result) {
state_->generated_unfinished_future.store(false);
return result;
}
std::shared_ptr<State> state_;
};
std::shared_ptr<State> state_;
};
template <typename T>
class ReentrantCheckerGuard {
public:
explicit ReentrantCheckerGuard(ReentrantChecker<T> checker) : checker_(checker) {}
ARROW_DISALLOW_COPY_AND_ASSIGN(ReentrantCheckerGuard);
ReentrantCheckerGuard(ReentrantCheckerGuard&& other) : checker_(other.checker_) {
if (other.owner_) {
other.owner_ = false;
owner_ = true;
} else {
owner_ = false;
}
}
ReentrantCheckerGuard& operator=(ReentrantCheckerGuard&& other) {
checker_ = other.checker_;
if (other.owner_) {
other.owner_ = false;
owner_ = true;
} else {
owner_ = false;
}
return *this;
}
~ReentrantCheckerGuard() {
if (owner_ && !checker_.valid()) {
ADD_FAILURE() << "A generator was accessed reentrantly when the test asserted it "
"should not be.";
}
}
private:
ReentrantChecker<T> checker_;
bool owner_ = true;
};
template <typename T>
ReentrantCheckerGuard<T> ExpectNotAccessedReentrantly(AsyncGenerator<T>* generator) {
auto reentrant_checker = ReentrantChecker<T>(*generator);
*generator = reentrant_checker;
return ReentrantCheckerGuard<T>(reentrant_checker);
}
class GeneratorTestFixture : public ::testing::TestWithParam<bool> {
protected:
AsyncGenerator<TestInt> MakeSource(const std::vector<TestInt>& items) {
std::vector<TestInt> wrapped(items.begin(), items.end());
auto gen = AsyncVectorIt(std::move(wrapped));
if (IsSlow()) {
return SlowdownABit(std::move(gen));
}
return gen;
}
AsyncGenerator<TestInt> MakeFailingSource() {
AsyncGenerator<TestInt> gen = [] {
return Future<TestInt>::MakeFinished(Status::Invalid("XYZ"));
};
if (IsSlow()) {
return SlowdownABit(std::move(gen));
}
return gen;
}
int GetNumItersForStress() {
// Run fewer trials for the slow case since they take longer
if (IsSlow()) {
return 10;
} else {
return 100;
}
}
bool IsSlow() { return GetParam(); }
};
template <typename T>
class ManualIteratorControl {
public:
virtual ~ManualIteratorControl() {}
virtual void Push(Result<T> result) = 0;
virtual uint32_t times_polled() = 0;
};
template <typename T>
class PushIterator : public ManualIteratorControl<T> {
public:
PushIterator() : state_(std::make_shared<State>()) {}
virtual ~PushIterator() {}
Result<T> Next() {
std::unique_lock<std::mutex> lk(state_->mx);
state_->times_polled++;
if (!state_->cv.wait_for(lk, std::chrono::seconds(300),
[&] { return !state_->items.empty(); })) {
return Status::Invalid("Timed out waiting for PushIterator");
}
auto next = std::move(state_->items.front());
state_->items.pop();
return next;
}
void Push(Result<T> result) override {
{
std::lock_guard<std::mutex> lg(state_->mx);
state_->items.push(std::move(result));
}
state_->cv.notify_one();
}
uint32_t times_polled() override {
std::lock_guard<std::mutex> lg(state_->mx);
return state_->times_polled;
}
private:
struct State {
uint32_t times_polled = 0;
std::mutex mx;
std::condition_variable cv;
std::queue<Result<T>> items;
};
std::shared_ptr<State> state_;
};
template <typename T>
Iterator<T> MakePushIterator(std::shared_ptr<ManualIteratorControl<T>>* out) {
auto iter = std::make_shared<PushIterator<T>>();
*out = iter;
return Iterator<T>(*iter);
}
template <typename T>
class ManualGenerator {
public:
ManualGenerator() : times_polled_(std::make_shared<uint32_t>()) {}
Future<T> operator()() {
(*times_polled_)++;
return source_();
}
uint32_t times_polled() const { return *times_polled_; }
typename PushGenerator<T>::Producer producer() { return source_.producer(); }
private:
PushGenerator<T> source_;
std::shared_ptr<uint32_t> times_polled_;
};
TEST(TestAsyncUtil, Visit) {
auto generator = AsyncVectorIt<TestInt>({1, 2, 3});
unsigned int sum = 0;
auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
sum += item.value;
return Status::OK();
});
ASSERT_TRUE(sum_future.is_finished());
ASSERT_EQ(6, sum);
}
TEST(TestAsyncUtil, Collect) {
std::vector<TestInt> expected = {1, 2, 3};
auto generator = AsyncVectorIt(expected);
auto collected = CollectAsyncGenerator(generator);
ASSERT_FINISHES_OK_AND_ASSIGN(auto collected_val, collected);
ASSERT_EQ(expected, collected_val);
}
TEST(TestAsyncUtil, Map) {
std::vector<TestInt> input = {1, 2, 3};
auto generator = AsyncVectorIt(input);
std::function<TestStr(const TestInt&)> mapper = [](const TestInt& in) {
return std::to_string(in.value);
};
auto mapped = MakeMappedGenerator(std::move(generator), mapper);
std::vector<TestStr> expected{"1", "2", "3"};
AssertAsyncGeneratorMatch(expected, mapped);
}
TEST(TestAsyncUtil, MapAsync) {
std::vector<TestInt> input = {1, 2, 3};
auto generator = AsyncVectorIt(input);
std::function<Future<TestStr>(const TestInt&)> mapper = [](const TestInt& in) {
return SleepAsync(1e-3).Then([in](const Result<detail::Empty>& empty) {
return TestStr(std::to_string(in.value));
});
};
auto mapped = MakeMappedGenerator(std::move(generator), mapper);
std::vector<TestStr> expected{"1", "2", "3"};
AssertAsyncGeneratorMatch(expected, mapped);
}
TEST(TestAsyncUtil, MapReentrant) {
std::vector<TestInt> input = {1, 2};
auto source = AsyncVectorIt(input);
TrackingGenerator<TestInt> tracker(std::move(source));
source = MakeTransferredGenerator(AsyncGenerator<TestInt>(tracker),
internal::GetCpuThreadPool());
std::atomic<int> map_tasks_running(0);
// Mapper blocks until can_proceed is marked finished, should start multiple map tasks
Future<> can_proceed = Future<>::Make();
std::function<Future<TestStr>(const TestInt&)> mapper = [&](const TestInt& in) {
map_tasks_running.fetch_add(1);
return can_proceed.Then([in](...) { return TestStr(std::to_string(in.value)); });
};
auto mapped = MakeMappedGenerator(std::move(source), mapper);
EXPECT_EQ(0, tracker.num_read());
auto one = mapped();
auto two = mapped();
BusyWait(10, [&] { return map_tasks_running.load() == 2; });
EXPECT_EQ(2, map_tasks_running.load());
EXPECT_EQ(2, tracker.num_read());
auto end_one = mapped();
auto end_two = mapped();
can_proceed.MarkFinished();
ASSERT_FINISHES_OK_AND_ASSIGN(auto oneval, one);
EXPECT_EQ("1", oneval.value);
ASSERT_FINISHES_OK_AND_ASSIGN(auto twoval, two);
EXPECT_EQ("2", twoval.value);
ASSERT_FINISHES_OK_AND_ASSIGN(auto end, end_one);
ASSERT_EQ(IterationTraits<TestStr>::End(), end);
ASSERT_FINISHES_OK_AND_ASSIGN(end, end_two);
ASSERT_EQ(IterationTraits<TestStr>::End(), end);
}
TEST(TestAsyncUtil, MapParallelStress) {
constexpr int NTASKS = 10;
constexpr int NITEMS = 10;
for (int i = 0; i < NTASKS; i++) {
auto gen = MakeVectorGenerator(RangeVector(NITEMS));
gen = SlowdownABit(std::move(gen));
auto guard = ExpectNotAccessedReentrantly(&gen);
std::function<TestStr(const TestInt&)> mapper = [](const TestInt& in) {
SleepABit();
return std::to_string(in.value);
};
auto mapped = MakeMappedGenerator(std::move(gen), mapper);
mapped = MakeReadaheadGenerator(mapped, 8);
ASSERT_FINISHES_OK_AND_ASSIGN(auto collected, CollectAsyncGenerator(mapped));
ASSERT_EQ(NITEMS, collected.size());
}
}
TEST(TestAsyncUtil, MapTaskFail) {
std::vector<TestInt> input = {1, 2, 3};
auto generator = AsyncVectorIt(input);
std::function<Result<TestStr>(const TestInt&)> mapper =
[](const TestInt& in) -> Result<TestStr> {
if (in.value == 2) {
return Status::Invalid("XYZ");
}
return TestStr(std::to_string(in.value));
};
auto mapped = MakeMappedGenerator(std::move(generator), mapper);
ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(mapped));
}
TEST(TestAsyncUtil, MapSourceFail) {
std::vector<TestInt> input = {1, 2, 3};
auto generator = FailsAt(AsyncVectorIt(input), 1);
std::function<Result<TestStr>(const TestInt&)> mapper =
[](const TestInt& in) -> Result<TestStr> {
return TestStr(std::to_string(in.value));
};
auto mapped = MakeMappedGenerator(std::move(generator), mapper);
ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(mapped));
}
TEST(TestAsyncUtil, Concatenated) {
std::vector<TestInt> inputOne{1, 2, 3};
std::vector<TestInt> inputTwo{4, 5, 6};
std::vector<TestInt> expected{1, 2, 3, 4, 5, 6};
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
{AsyncVectorIt<TestInt>(inputOne), AsyncVectorIt<TestInt>(inputTwo)});
auto concat = MakeConcatenatedGenerator(gen);
AssertAsyncGeneratorMatch(expected, concat);
}
class FromFutureFixture : public GeneratorTestFixture {};
TEST_P(FromFutureFixture, Basic) {
auto source = Future<std::vector<TestInt>>::MakeFinished(RangeVector(3));
if (IsSlow()) {
source = SleepABitAsync().Then(
[](...) -> Result<std::vector<TestInt>> { return RangeVector(3); });
}
auto slow = IsSlow();
auto to_gen = source.Then([slow](const std::vector<TestInt>& vec) {
auto vec_gen = MakeVectorGenerator(vec);
if (slow) {
return SlowdownABit(std::move(vec_gen));
}
return vec_gen;
});
auto gen = MakeFromFuture(std::move(to_gen));
auto collected = CollectAsyncGenerator(std::move(gen));
ASSERT_FINISHES_OK_AND_EQ(RangeVector(3), collected);
}
INSTANTIATE_TEST_SUITE_P(FromFutureTests, FromFutureFixture,
::testing::Values(false, true));
class MergedGeneratorTestFixture : public GeneratorTestFixture {};
TEST_P(MergedGeneratorTestFixture, Merged) {
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2, 3}), MakeSource({4, 5, 6})});
auto concat_gen = MakeMergedGenerator(gen, 10);
ASSERT_FINISHES_OK_AND_ASSIGN(auto concat, CollectAsyncGenerator(concat_gen));
auto concat_ints =
internal::MapVector([](const TestInt& val) { return val.value; }, concat);
std::set<int> concat_set(concat_ints.begin(), concat_ints.end());
std::set<int> expected{1, 2, 4, 3, 5, 6};
ASSERT_EQ(expected, concat_set);
}
TEST_P(MergedGeneratorTestFixture, MergedInnerFail) {
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2, 3}), MakeFailingSource()});
auto merged_gen = MakeMergedGenerator(gen, 10);
ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen));
}
TEST_P(MergedGeneratorTestFixture, MergedOuterFail) {
auto gen =
FailsAt(AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2, 3}), MakeSource({1, 2, 3}), MakeSource({1, 2, 3})}),
1);
auto merged_gen = MakeMergedGenerator(gen, 10);
ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen));
}
TEST_P(MergedGeneratorTestFixture, MergedLimitedSubscriptions) {
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2}), MakeSource({3, 4}), MakeSource({5, 6, 7, 8}),
MakeSource({9, 10, 11, 12})});
TrackingGenerator<AsyncGenerator<TestInt>> tracker(std::move(gen));
auto merged = MakeMergedGenerator(AsyncGenerator<AsyncGenerator<TestInt>>(tracker), 2);
SleepABit();
// Lazy pull, should not start until first pull
ASSERT_EQ(0, tracker.num_read());
ASSERT_FINISHES_OK_AND_ASSIGN(auto next, merged());
ASSERT_TRUE(next.value == 1 || next.value == 3);
// First 2 values have to come from one of the first 2 sources
ASSERT_EQ(2, tracker.num_read());
ASSERT_FINISHES_OK_AND_ASSIGN(next, merged());
ASSERT_LT(next.value, 5);
ASSERT_GT(next.value, 0);
// By the time five values have been read we should have exhausted at
// least one source
for (int i = 0; i < 3; i++) {
ASSERT_FINISHES_OK_AND_ASSIGN(next, merged());
// 9 is possible if we read 1,2,3,4 and then grab 9 while 5 is running slow
ASSERT_LT(next.value, 10);
ASSERT_GT(next.value, 0);
}
ASSERT_GT(tracker.num_read(), 2);
ASSERT_LT(tracker.num_read(), 5);
// Read remaining values
for (int i = 0; i < 7; i++) {
ASSERT_FINISHES_OK_AND_ASSIGN(next, merged());
ASSERT_LT(next.value, 13);
ASSERT_GT(next.value, 0);
}
AssertGeneratorExhausted(merged);
}
TEST_P(MergedGeneratorTestFixture, MergedStress) {
constexpr int NGENERATORS = 10;
constexpr int NITEMS = 10;
for (int i = 0; i < GetNumItersForStress(); i++) {
std::vector<AsyncGenerator<TestInt>> sources;
std::vector<ReentrantCheckerGuard<TestInt>> guards;
for (int j = 0; j < NGENERATORS; j++) {
auto source = MakeSource(RangeVector(NITEMS));
guards.push_back(ExpectNotAccessedReentrantly(&source));
sources.push_back(source);
}
AsyncGenerator<AsyncGenerator<TestInt>> source_gen = AsyncVectorIt(sources);
auto merged = MakeMergedGenerator(source_gen, 4);
ASSERT_FINISHES_OK_AND_ASSIGN(auto items, CollectAsyncGenerator(merged));
ASSERT_EQ(NITEMS * NGENERATORS, items.size());
}
}
TEST_P(MergedGeneratorTestFixture, MergedParallelStress) {
constexpr int NGENERATORS = 10;
constexpr int NITEMS = 10;
for (int i = 0; i < GetNumItersForStress(); i++) {
std::vector<AsyncGenerator<TestInt>> sources;
for (int j = 0; j < NGENERATORS; j++) {
sources.push_back(MakeSource(RangeVector(NITEMS)));
}
auto merged = MakeMergedGenerator(AsyncVectorIt(sources), 4);
merged = MakeReadaheadGenerator(merged, 4);
ASSERT_FINISHES_OK_AND_ASSIGN(auto items, CollectAsyncGenerator(merged));
ASSERT_EQ(NITEMS * NGENERATORS, items.size());
}
}
INSTANTIATE_TEST_SUITE_P(MergedGeneratorTests, GeneratorTestFixture,
::testing::Values(false, true));
TEST(TestAsyncUtil, FromVector) {
AsyncGenerator<TestInt> gen;
{
std::vector<TestInt> input = {1, 2, 3};
gen = MakeVectorGenerator(std::move(input));
}
std::vector<TestInt> expected = {1, 2, 3};
AssertAsyncGeneratorMatch(expected, gen);
}
TEST(TestAsyncUtil, SynchronousFinish) {
AsyncGenerator<TestInt> generator = []() {
return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
};
Transformer<TestInt, TestStr> skip_all = [](TestInt value) { return TransformSkip(); };
auto transformed = MakeTransformedGenerator(generator, skip_all);
auto future = CollectAsyncGenerator(transformed);
ASSERT_FINISHES_OK_AND_ASSIGN(auto actual, future);
ASSERT_EQ(std::vector<TestStr>(), actual);
}
TEST(TestAsyncUtil, GeneratorIterator) {
auto generator = BackgroundAsyncVectorIt({1, 2, 3});
ASSERT_OK_AND_ASSIGN(auto iterator, MakeGeneratorIterator(std::move(generator)));
ASSERT_OK_AND_EQ(TestInt(1), iterator.Next());
ASSERT_OK_AND_EQ(TestInt(2), iterator.Next());
ASSERT_OK_AND_EQ(TestInt(3), iterator.Next());
AssertIteratorExhausted(iterator);
AssertIteratorExhausted(iterator);
}
TEST(TestAsyncUtil, MakeTransferredGenerator) {
std::mutex mutex;
std::condition_variable cv;
std::atomic<bool> finished(false);
ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
// Needs to be a slow source to ensure we don't call Then on a completed
AsyncGenerator<TestInt> slow_generator = [&]() {
return thread_pool
->Submit([&] {
std::unique_lock<std::mutex> lock(mutex);
cv.wait_for(lock, std::chrono::duration<double>(30),
[&] { return finished.load(); });
return IterationTraits<TestInt>::End();
})
.ValueOrDie();
};
auto transferred =
MakeTransferredGenerator<TestInt>(std::move(slow_generator), thread_pool.get());
auto current_thread_id = std::this_thread::get_id();
auto fut = transferred().Then([&current_thread_id](const Result<TestInt>& result) {
ASSERT_NE(current_thread_id, std::this_thread::get_id());
});
{
std::lock_guard<std::mutex> lg(mutex);
finished.store(true);
}
cv.notify_one();
ASSERT_FINISHES_OK(fut);
}
// This test is too slow for valgrind
#if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER))
TEST(TestAsyncUtil, StackOverflow) {
int counter = 0;
AsyncGenerator<TestInt> generator = [&counter]() {
if (counter < 10000) {
return Future<TestInt>::MakeFinished(counter++);
} else {
return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
}
};
Transformer<TestInt, TestStr> discard =
[](TestInt next) -> Result<TransformFlow<TestStr>> { return TransformSkip(); };
auto transformed = MakeTransformedGenerator(generator, discard);
auto collected_future = CollectAsyncGenerator(transformed);
ASSERT_FINISHES_OK_AND_ASSIGN(auto collected, collected_future);
ASSERT_EQ(0, collected.size());
}
#endif
class BackgroundGeneratorTestFixture : public GeneratorTestFixture {
protected:
AsyncGenerator<TestInt> Make(const std::vector<TestInt>& it,
int max_q = kDefaultBackgroundMaxQ,
int q_restart = kDefaultBackgroundQRestart) {
bool slow = GetParam();
return BackgroundAsyncVectorIt(it, slow, max_q, q_restart);
}
};
TEST_P(BackgroundGeneratorTestFixture, Empty) {
auto background = Make({});
AssertGeneratorExhausted(background);
}
TEST_P(BackgroundGeneratorTestFixture, Basic) {
std::vector<TestInt> expected = {1, 2, 3};
auto background = Make(expected);
auto future = CollectAsyncGenerator(background);
ASSERT_FINISHES_OK_AND_ASSIGN(auto collected, future);
ASSERT_EQ(expected, collected);
}
TEST_P(BackgroundGeneratorTestFixture, BadResult) {
std::shared_ptr<ManualIteratorControl<TestInt>> iterator_control;
auto iterator = MakePushIterator<TestInt>(&iterator_control);
// Enough valid items to fill the queue and then some
for (int i = 0; i < 5; i++) {
iterator_control->Push(i);
}
// Next fail
iterator_control->Push(Status::Invalid("XYZ"));
ASSERT_OK_AND_ASSIGN(
auto generator,
MakeBackgroundGenerator(std::move(iterator), internal::GetCpuThreadPool(), 4, 2));
ASSERT_FINISHES_OK_AND_EQ(TestInt(0), generator());
// Have not yet restarted so next results should always be valid
ASSERT_FINISHES_OK_AND_EQ(TestInt(1), generator());
// Next three results may or may not be valid.
// The typical case is the call for TestInt(2) restarts a full queue and then maybe
// TestInt(3) and TestInt(4) arrive quickly enough to not get pre-empted or maybe
// they don't.
//
// A more bizarre, but possible, case is the checking thread falls behind the producer
// thread just so and TestInt(1) arrives and is delivered but before the call for
// TestInt(2) happens the background reader reads 2, 3, 4, and 5[err] into the queue so
// the queue never fills up and even TestInt(2) is preempted.
bool invalid_encountered = false;
for (int i = 0; i < 3; i++) {
auto next_fut = generator();
auto next_result = next_fut.result();
if (next_result.ok()) {
ASSERT_EQ(TestInt(i + 2), next_result.ValueUnsafe());
} else {
invalid_encountered = true;
break;
}
}
// If both of the next two results are valid then this one will surely be invalid
if (!invalid_encountered) {
ASSERT_FINISHES_AND_RAISES(Invalid, generator());
}
AssertGeneratorExhausted(generator);
}
TEST_P(BackgroundGeneratorTestFixture, InvalidExecutor) {
std::vector<TestInt> expected = {1, 2, 3, 4, 5, 6, 7, 8};
// Case 1: waiting future
auto slow = GetParam();
auto it = PossiblySlowVectorIt(expected, slow);
ASSERT_OK_AND_ASSIGN(auto invalid_executor, internal::ThreadPool::Make(1));
ASSERT_OK(invalid_executor->Shutdown());
ASSERT_OK_AND_ASSIGN(auto background, MakeBackgroundGenerator(
std::move(it), invalid_executor.get(), 4, 2));
ASSERT_FINISHES_AND_RAISES(Invalid, background());
// Case 2: Queue bad result
it = PossiblySlowVectorIt(expected, slow);
ASSERT_OK_AND_ASSIGN(invalid_executor, internal::ThreadPool::Make(1));
ASSERT_OK_AND_ASSIGN(
background, MakeBackgroundGenerator(std::move(it), invalid_executor.get(), 4, 2));
ASSERT_FINISHES_OK_AND_EQ(TestInt(1), background());
ASSERT_OK(invalid_executor->Shutdown());
// Next two are ok because queue is shutdown
ASSERT_FINISHES_OK_AND_EQ(TestInt(2), background());
ASSERT_FINISHES_OK_AND_EQ(TestInt(3), background());
// Now the queue should have tried (and failed) to start back up
ASSERT_FINISHES_AND_RAISES(Invalid, background());
}
TEST_P(BackgroundGeneratorTestFixture, StopAndRestart) {
std::shared_ptr<ManualIteratorControl<TestInt>> iterator_control;
auto iterator = MakePushIterator<TestInt>(&iterator_control);
// Start with 6 items in the source
for (int i = 0; i < 6; i++) {
iterator_control->Push(i);
}
iterator_control->Push(IterationEnd<TestInt>());
ASSERT_OK_AND_ASSIGN(
auto generator,
MakeBackgroundGenerator(std::move(iterator), internal::GetCpuThreadPool(), 4, 2));
SleepABit();
// Lazy, should not start until polled once
ASSERT_EQ(iterator_control->times_polled(), 0);
// First poll should trigger 5 reads (1 for the polled value, 4 for the queue)
auto next = generator();
BusyWait(10, [&] { return iterator_control->times_polled() >= 5; });
// And then stop and not read any more
SleepABit();
ASSERT_EQ(iterator_control->times_polled(), 5);
ASSERT_FINISHES_OK_AND_EQ(TestInt(0), next);
// One more read should bring q down to 3 and should not restart
ASSERT_FINISHES_OK_AND_EQ(TestInt(1), generator());
SleepABit();
ASSERT_EQ(iterator_control->times_polled(), 5);
// One more read should bring q down to 2 and that should restart
// but it will only read up to 6 because we hit end of stream
ASSERT_FINISHES_OK_AND_EQ(TestInt(2), generator());
BusyWait(10, [&] { return iterator_control->times_polled() >= 7; });
ASSERT_EQ(iterator_control->times_polled(), 7);
for (int i = 3; i < 6; i++) {
ASSERT_FINISHES_OK_AND_EQ(TestInt(i), generator());
}
AssertGeneratorExhausted(generator);
}
struct SlowEmptyIterator {
Result<TestInt> Next() {
if (called_) {
return Status::Invalid("Should not have been called twice");
}
SleepFor(0.1);
return IterationTraits<TestInt>::End();
}
private:
bool called_ = false;
};
TEST_P(BackgroundGeneratorTestFixture, BackgroundRepeatEnd) {
// Ensure that the background generator properly fulfills the asyncgenerator contract
// and can be called after it ends.
ASSERT_OK_AND_ASSIGN(auto io_pool, internal::ThreadPool::Make(1));
bool slow = GetParam();
Iterator<TestInt> iterator;
if (slow) {
iterator = Iterator<TestInt>(SlowEmptyIterator());
} else {
iterator = MakeEmptyIterator<TestInt>();
}
ASSERT_OK_AND_ASSIGN(auto background_gen,
MakeBackgroundGenerator(std::move(iterator), io_pool.get()));
background_gen =
MakeTransferredGenerator(std::move(background_gen), internal::GetCpuThreadPool());
auto one = background_gen();
ASSERT_FINISHES_OK_AND_ASSIGN(auto one_fin, one);
ASSERT_TRUE(IsIterationEnd(one_fin));
auto two = background_gen();
ASSERT_FINISHES_OK_AND_ASSIGN(auto two_fin, two);
ASSERT_TRUE(IsIterationEnd(two_fin));
}
TEST_P(BackgroundGeneratorTestFixture, Stress) {
constexpr int NTASKS = 20;
constexpr int NITEMS = 20;
auto expected = RangeVector(NITEMS);
std::vector<Future<std::vector<TestInt>>> futures;
for (unsigned int i = 0; i < NTASKS; i++) {
auto background = Make(expected, /*max_q=*/4, /*q_restart=*/2);
futures.push_back(CollectAsyncGenerator(background));
}
auto combined = All(futures);
ASSERT_FINISHES_OK_AND_ASSIGN(auto completed_vectors, combined);
for (std::size_t i = 0; i < completed_vectors.size(); i++) {
ASSERT_OK_AND_ASSIGN(auto vector, completed_vectors[i]);
ASSERT_EQ(vector, expected);
}
}
INSTANTIATE_TEST_SUITE_P(BackgroundGeneratorTests, BackgroundGeneratorTestFixture,
::testing::Values(false, true));
TEST(TestAsyncUtil, SerialReadaheadSlowProducer) {
AsyncGenerator<TestInt> gen = BackgroundAsyncVectorIt({1, 2, 3, 4, 5});
auto guard = ExpectNotAccessedReentrantly(&gen);
SerialReadaheadGenerator<TestInt> serial_readahead(gen, 2);
AssertAsyncGeneratorMatch({1, 2, 3, 4, 5},
static_cast<AsyncGenerator<TestInt>>(serial_readahead));
}
TEST(TestAsyncUtil, SerialReadaheadSlowConsumer) {
int num_delivered = 0;
auto source = [&num_delivered]() {
if (num_delivered < 5) {
return Future<TestInt>::MakeFinished(num_delivered++);
} else {
return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
}
};
AsyncGenerator<TestInt> serial_readahead = SerialReadaheadGenerator<TestInt>(source, 3);
SleepABit();
ASSERT_EQ(0, num_delivered);
ASSERT_FINISHES_OK_AND_ASSIGN(auto next, serial_readahead());
ASSERT_EQ(0, next.value);
ASSERT_EQ(4, num_delivered);
AssertAsyncGeneratorMatch({1, 2, 3, 4}, serial_readahead);
// Ensure still reads ahead with just 1 slot
num_delivered = 0;
serial_readahead = SerialReadaheadGenerator<TestInt>(source, 1);
ASSERT_FINISHES_OK_AND_ASSIGN(next, serial_readahead());
ASSERT_EQ(0, next.value);
ASSERT_EQ(2, num_delivered);
AssertAsyncGeneratorMatch({1, 2, 3, 4}, serial_readahead);
}
TEST(TestAsyncUtil, SerialReadaheadStress) {
constexpr int NTASKS = 20;
constexpr int NITEMS = 50;
for (int i = 0; i < NTASKS; i++) {
AsyncGenerator<TestInt> gen = BackgroundAsyncVectorIt(RangeVector(NITEMS));
auto guard = ExpectNotAccessedReentrantly(&gen);
SerialReadaheadGenerator<TestInt> serial_readahead(gen, 2);
auto visit_fut =
VisitAsyncGenerator<TestInt>(serial_readahead, [](TestInt test_int) -> Status {
// Normally sleeping in a visit function would be a faux-pas but we want to slow
// the reader down to match the producer to maximize the stress
SleepABit();
return Status::OK();
});
ASSERT_FINISHES_OK(visit_fut);
}
}
TEST(TestAsyncUtil, SerialReadaheadStressFast) {
constexpr int NTASKS = 20;
constexpr int NITEMS = 50;
for (int i = 0; i < NTASKS; i++) {
AsyncGenerator<TestInt> gen = BackgroundAsyncVectorIt(RangeVector(NITEMS), false);
auto guard = ExpectNotAccessedReentrantly(&gen);
SerialReadaheadGenerator<TestInt> serial_readahead(gen, 2);
auto visit_fut = VisitAsyncGenerator<TestInt>(
serial_readahead, [](TestInt test_int) -> Status { return Status::OK(); });
ASSERT_FINISHES_OK(visit_fut);
}
}
TEST(TestAsyncUtil, SerialReadaheadStressFailing) {
constexpr int NTASKS = 20;
constexpr int NITEMS = 50;
constexpr int EXPECTED_SUM = 45;
for (int i = 0; i < NTASKS; i++) {
AsyncGenerator<TestInt> it = BackgroundAsyncVectorIt(RangeVector(NITEMS));
AsyncGenerator<TestInt> fails_at_ten = [&it]() {
auto next = it();
return next.Then([](const Result<TestInt>& item) -> Result<TestInt> {
if (item->value >= 10) {
return Status::Invalid("XYZ");
} else {
return item;
}
});
};
SerialReadaheadGenerator<TestInt> serial_readahead(fails_at_ten, 2);
unsigned int sum = 0;
auto visit_fut = VisitAsyncGenerator<TestInt>(serial_readahead,
[&sum](TestInt test_int) -> Status {
sum += test_int.value;
// Sleep to maximize stress
SleepABit();
return Status::OK();
});
ASSERT_FINISHES_AND_RAISES(Invalid, visit_fut);
ASSERT_EQ(EXPECTED_SUM, sum);
}
}
TEST(TestAsyncUtil, Readahead) {
int num_delivered = 0;
auto source = [&num_delivered]() {
if (num_delivered < 5) {
return Future<TestInt>::MakeFinished(num_delivered++);
} else {
return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
}
};
auto readahead = MakeReadaheadGenerator<TestInt>(source, 10);
// Should not pump until first item requested
ASSERT_EQ(0, num_delivered);
auto first = readahead();
// At this point the pumping should have happened
ASSERT_EQ(5, num_delivered);
ASSERT_FINISHES_OK_AND_ASSIGN(auto first_val, first);
ASSERT_EQ(TestInt(0), first_val);
// Read the rest
for (int i = 0; i < 4; i++) {
auto next = readahead();
ASSERT_FINISHES_OK_AND_ASSIGN(auto next_val, next);
ASSERT_EQ(TestInt(i + 1), next_val);
}
// Next should be end
auto last = readahead();
ASSERT_FINISHES_OK_AND_ASSIGN(auto last_val, last);
ASSERT_TRUE(IsIterationEnd(last_val));
}
TEST(TestAsyncUtil, ReadaheadFailed) {
ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(4));
std::atomic<int32_t> counter(0);
// All tasks are a little slow. The first task fails.
// The readahead will have spawned 9 more tasks and they
// should all pass
auto source = [thread_pool, &counter]() -> Future<TestInt> {
auto count = counter++;
return *thread_pool->Submit([count]() -> Result<TestInt> {
if (count == 0) {
return Status::Invalid("X");
}
return TestInt(count);
});
};
auto readahead = MakeReadaheadGenerator<TestInt>(source, 10);
ASSERT_FINISHES_AND_RAISES(Invalid, readahead());
SleepABit();
for (int i = 0; i < 9; i++) {
ASSERT_FINISHES_OK_AND_ASSIGN(auto next_val, readahead());
ASSERT_EQ(TestInt(i + 1), next_val);
}
ASSERT_FINISHES_OK_AND_ASSIGN(auto after, readahead());
// It's possible that finished was set quickly and there
// are only 10 elements
if (IsIterationEnd(after)) {
return;
}
// It's also possible that finished was too slow and there
// ended up being 11 elements
ASSERT_EQ(TestInt(10), after);
// There can't be 12 elements because SleepABit will prevent it
ASSERT_FINISHES_OK_AND_ASSIGN(auto definitely_last, readahead());
ASSERT_TRUE(IsIterationEnd(definitely_last));
}
class SequencerTestFixture : public GeneratorTestFixture {
protected:
void RandomShuffle(std::vector<TestInt>& values) {
std::default_random_engine gen(seed_++);
std::shuffle(values.begin(), values.end(), gen);
}
int seed_ = 42;
std::function<bool(const TestInt&, const TestInt&)> cmp_ =
[](const TestInt& left, const TestInt& right) { return left.value > right.value; };
// Let's increment by 2's to make it interesting
std::function<bool(const TestInt&, const TestInt&)> is_next_ =
[](const TestInt& left, const TestInt& right) {
return left.value + 2 == right.value;
};
};
TEST_P(SequencerTestFixture, SequenceBasic) {
// Basic sequencing
auto original = MakeSource({6, 4, 2});
auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_, TestInt(0));
AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
// From ordered input
original = MakeSource({2, 4, 6});
sequenced = MakeSequencingGenerator(original, cmp_, is_next_, TestInt(0));
AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
}
TEST_P(SequencerTestFixture, SequenceLambda) {
auto cmp = [](const TestInt& left, const TestInt& right) {
return left.value > right.value;
};
auto is_next = [](const TestInt& left, const TestInt& right) {
return left.value + 2 == right.value;
};
// Basic sequencing
auto original = MakeSource({6, 4, 2});
auto sequenced = MakeSequencingGenerator(original, cmp, is_next, TestInt(0));
AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
}
TEST_P(SequencerTestFixture, SequenceError) {
{
auto original = MakeSource({6, 4, 2});
original = FailsAt(original, 1);
auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_, TestInt(0));
auto collected = CollectAsyncGenerator(sequenced);
ASSERT_FINISHES_AND_RAISES(Invalid, collected);
}
{
// Failure should clear old items out of the queue immediately
// shared_ptr versions of cmp_ and is_next_
auto cmp = cmp_;
std::function<bool(const std::shared_ptr<TestInt>&, const std::shared_ptr<TestInt>&)>
ptr_cmp =
[cmp](const std::shared_ptr<TestInt>& left,
const std::shared_ptr<TestInt>& right) { return cmp(*left, *right); };
auto is_next = is_next_;
std::function<bool(const std::shared_ptr<TestInt>&, const std::shared_ptr<TestInt>&)>
ptr_is_next = [is_next](const std::shared_ptr<TestInt>& left,
const std::shared_ptr<TestInt>& right) {
return is_next(*left, *right);
};
PushGenerator<std::shared_ptr<TestInt>> source;
auto sequenced = MakeSequencingGenerator(
static_cast<AsyncGenerator<std::shared_ptr<TestInt>>>(source), ptr_cmp,
ptr_is_next, std::make_shared<TestInt>(0));
auto should_be_cleared = std::make_shared<TestInt>(4);
std::weak_ptr<TestInt> ref = should_be_cleared;
auto producer = source.producer();
auto next_fut = sequenced();
producer.Push(std::move(should_be_cleared));
producer.Push(Status::Invalid("XYZ"));
ASSERT_TRUE(ref.expired());
ASSERT_FINISHES_AND_RAISES(Invalid, next_fut);
}
{
// Failure should interrupt pumping
PushGenerator<TestInt> source;
auto sequenced = MakeSequencingGenerator(static_cast<AsyncGenerator<TestInt>>(source),
cmp_, is_next_, TestInt(0));
auto producer = source.producer();
auto next_fut = sequenced();
producer.Push(TestInt(4));
producer.Push(Status::Invalid("XYZ"));
producer.Push(TestInt(2));
ASSERT_FINISHES_AND_RAISES(Invalid, next_fut);
// The sequencer should not have pulled the 2 out of the source because it should
// have stopped pumping on error
ASSERT_FINISHES_OK_AND_EQ(TestInt(2), source());
}
}
TEST_P(SequencerTestFixture, SequenceStress) {
constexpr int NITEMS = 100;
for (auto task_index = 0; task_index < GetNumItersForStress(); task_index++) {
auto input = RangeVector(NITEMS, 2);
RandomShuffle(input);
auto original = MakeSource(input);
auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_, TestInt(-2));
AssertAsyncGeneratorMatch(RangeVector(NITEMS, 2), sequenced);
}
}
INSTANTIATE_TEST_SUITE_P(SequencerTests, SequencerTestFixture,
::testing::Values(false, true));
TEST(TestAsyncIteratorTransform, SkipSome) {
auto original = AsyncVectorIt<TestInt>({1, 2, 3});
auto filter = MakeFilter([](TestInt& t) { return t.value != 2; });
auto filtered = MakeTransformedGenerator(std::move(original), filter);
AssertAsyncGeneratorMatch({"1", "3"}, std::move(filtered));
}
TEST(PushGenerator, Empty) {
PushGenerator<TestInt> gen;
auto producer = gen.producer();
auto fut = gen();
AssertNotFinished(fut);
producer.Close();
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), fut);
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
// Close idempotent
fut = gen();
producer.Close();
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), fut);
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
}
TEST(PushGenerator, Success) {
PushGenerator<TestInt> gen;
auto producer = gen.producer();
std::vector<Future<TestInt>> futures;
producer.Push(TestInt{1});
producer.Push(TestInt{2});
for (int i = 0; i < 3; ++i) {
futures.push_back(gen());
}
ASSERT_FINISHES_OK_AND_EQ(TestInt{1}, futures[0]);
ASSERT_FINISHES_OK_AND_EQ(TestInt{2}, futures[1]);
AssertNotFinished(futures[2]);
producer.Push(TestInt{3});
ASSERT_FINISHES_OK_AND_EQ(TestInt{3}, futures[2]);
producer.Push(TestInt{4});
futures.push_back(gen());
ASSERT_FINISHES_OK_AND_EQ(TestInt{4}, futures[3]);
producer.Push(TestInt{5});
producer.Close();
for (int i = 0; i < 4; ++i) {
futures.push_back(gen());
}
ASSERT_FINISHES_OK_AND_EQ(TestInt{5}, futures[4]);
for (int i = 5; i < 8; ++i) {
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), futures[i]);
}
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
}
TEST(PushGenerator, Errors) {
PushGenerator<TestInt> gen;
auto producer = gen.producer();
std::vector<Future<TestInt>> futures;
producer.Push(TestInt{1});
producer.Push(Status::Invalid("2"));
for (int i = 0; i < 3; ++i) {
futures.push_back(gen());
}
ASSERT_FINISHES_OK_AND_EQ(TestInt{1}, futures[0]);
ASSERT_FINISHES_AND_RAISES(Invalid, futures[1]);
AssertNotFinished(futures[2]);
producer.Push(Status::IOError("3"));
producer.Push(TestInt{4});
ASSERT_FINISHES_AND_RAISES(IOError, futures[2]);
futures.push_back(gen());
ASSERT_FINISHES_OK_AND_EQ(TestInt{4}, futures[3]);
producer.Close();
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
}
TEST(PushGenerator, CloseEarly) {
PushGenerator<TestInt> gen;
auto producer = gen.producer();
std::vector<Future<TestInt>> futures;
producer.Push(TestInt{1});
producer.Push(TestInt{2});
for (int i = 0; i < 3; ++i) {
futures.push_back(gen());
}
producer.Close();
producer.Push(TestInt{3});
ASSERT_FINISHES_OK_AND_EQ(TestInt{1}, futures[0]);
ASSERT_FINISHES_OK_AND_EQ(TestInt{2}, futures[1]);
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), futures[2]);
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen());
}
TEST(PushGenerator, Stress) {
const int NTHREADS = 20;
const int NVALUES = 2000;
const int NFUTURES = NVALUES + 100;
PushGenerator<TestInt> gen;
auto producer = gen.producer();
std::atomic<int> next_value{0};
auto producer_worker = [&]() {
while (true) {
int v = next_value.fetch_add(1);
if (v >= NVALUES) {
break;
}
producer.Push(v);
}
};
auto producer_main = [&]() {
std::vector<std::thread> threads;
for (int i = 0; i < NTHREADS; ++i) {
threads.emplace_back(producer_worker);
}
for (auto& thread : threads) {
thread.join();
}
producer.Close();
};
std::vector<Result<TestInt>> results;
std::thread thread(producer_main);
for (int i = 0; i < NFUTURES; ++i) {
results.push_back(gen().result());
}
thread.join();
std::unordered_set<int> seen_values;
for (int i = 0; i < NVALUES; ++i) {
ASSERT_OK_AND_ASSIGN(auto v, results[i]);
ASSERT_EQ(seen_values.count(v.value), 0);
seen_values.insert(v.value);
}
for (int i = NVALUES; i < NFUTURES; ++i) {
ASSERT_OK_AND_EQ(IterationTraits<TestInt>::End(), results[i]);
}
}
TEST(SingleFutureGenerator, Basics) {
auto fut = Future<TestInt>::Make();
auto gen = MakeSingleFutureGenerator(fut);
auto collect_fut = CollectAsyncGenerator(gen);
AssertNotFinished(collect_fut);
fut.MarkFinished(TestInt{42});
ASSERT_FINISHES_OK_AND_ASSIGN(auto collected, collect_fut);
ASSERT_EQ(collected, std::vector<TestInt>{42});
// Generator exhausted
collect_fut = CollectAsyncGenerator(gen);
ASSERT_FINISHES_OK_AND_EQ(std::vector<TestInt>{}, collect_fut);
}
TEST(FailingGenerator, Basics) {
auto gen = MakeFailingGenerator<TestInt>(Status::IOError("zzz"));
auto collect_fut = CollectAsyncGenerator(gen);
ASSERT_FINISHES_AND_RAISES(IOError, collect_fut);
// Generator exhausted
collect_fut = CollectAsyncGenerator(gen);
ASSERT_FINISHES_OK_AND_EQ(std::vector<TestInt>{}, collect_fut);
}
} // namespace arrow