blob: ab62fcb7034b7380c2e8a8e40d58f50a9a1f6ea7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <ostream>
#include <thread>
#include <unordered_set>
#include <vector>
#include "arrow/testing/gtest_util.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/test_common.h"
#include "arrow/util/vector.h"
namespace arrow {
template <typename T>
class TracingIterator {
public:
explicit TracingIterator(Iterator<T> it) : it_(std::move(it)), state_(new State) {}
Result<T> Next() {
auto lock = state_->Lock();
state_->thread_ids_.insert(std::this_thread::get_id());
RETURN_NOT_OK(state_->GetNextStatus());
ARROW_ASSIGN_OR_RAISE(auto out, it_.Next());
state_->values_.push_back(out);
state_->cv_.notify_one();
return out;
}
class State {
public:
const std::vector<T>& values() { return values_; }
const std::unordered_set<std::thread::id>& thread_ids() { return thread_ids_; }
void InsertFailure(Status st) {
auto lock = Lock();
next_status_ = std::move(st);
}
// Wait until the iterator has emitted at least `size` values
void WaitForValues(int size) {
auto lock = Lock();
cv_.wait(lock, [&]() { return values_.size() >= static_cast<size_t>(size); });
}
void AssertValuesEqual(const std::vector<T>& expected) {
auto lock = Lock();
ASSERT_EQ(values_, expected);
}
void AssertValuesStartwith(const std::vector<T>& expected) {
auto lock = Lock();
ASSERT_TRUE(std::equal(expected.begin(), expected.end(), values_.begin()));
}
std::unique_lock<std::mutex> Lock() { return std::unique_lock<std::mutex>(mutex_); }
private:
friend TracingIterator;
Status GetNextStatus() {
if (next_status_.ok()) {
return Status::OK();
}
Status st = std::move(next_status_);
next_status_ = Status::OK();
return st;
}
Status next_status_;
std::vector<T> values_;
std::unordered_set<std::thread::id> thread_ids_;
std::mutex mutex_;
std::condition_variable cv_;
};
const std::shared_ptr<State>& state() const { return state_; }
private:
Iterator<T> it_;
std::shared_ptr<State> state_;
};
template <typename T>
inline Iterator<T> EmptyIt() {
return MakeEmptyIterator<T>();
}
// Non-templated version of VectorIt<T> to allow better type deduction
inline Iterator<TestInt> VectorIt(std::vector<TestInt> v) {
return MakeVectorIterator<TestInt>(std::move(v));
}
template <typename Fn, typename T>
inline Iterator<T> FilterIt(Iterator<T> it, Fn&& fn) {
return MakeFilterIterator(std::forward<Fn>(fn), std::move(it));
}
template <typename T>
inline Iterator<T> FlattenIt(Iterator<Iterator<T>> its) {
return MakeFlattenIterator(std::move(its));
}
template <typename T>
void AssertIteratorMatch(std::vector<T> expected, Iterator<T> actual) {
EXPECT_EQ(expected, IteratorToVector(std::move(actual)));
}
template <typename T>
void AssertIteratorNoMatch(std::vector<T> expected, Iterator<T> actual) {
EXPECT_NE(expected, IteratorToVector(std::move(actual)));
}
template <typename T>
void AssertIteratorNext(T expected, Iterator<T>& it) {
ASSERT_OK_AND_ASSIGN(T actual, it.Next());
ASSERT_EQ(expected, actual);
}
// --------------------------------------------------------------------
// Synchronous iterator tests
TEST(TestEmptyIterator, Basic) { AssertIteratorMatch({}, EmptyIt<TestInt>()); }
TEST(TestVectorIterator, Basic) {
AssertIteratorMatch({}, VectorIt({}));
AssertIteratorMatch({1, 2, 3}, VectorIt({1, 2, 3}));
AssertIteratorNoMatch({1}, VectorIt({}));
AssertIteratorNoMatch({}, VectorIt({1, 2, 3}));
AssertIteratorNoMatch({1, 2, 2}, VectorIt({1, 2, 3}));
AssertIteratorNoMatch({1, 2, 3, 1}, VectorIt({1, 2, 3}));
// int does not have specialized IterationTraits
std::vector<int> elements = {0, 1, 2, 3, 4, 5};
std::vector<int*> expected;
for (int& element : elements) {
expected.push_back(&element);
}
AssertIteratorMatch(expected, MakeVectorPointingIterator(std::move(elements)));
}
TEST(TestVectorIterator, RangeForLoop) {
std::vector<TestInt> ints = {1, 2, 3, 4};
auto ints_it = ints.begin();
for (auto maybe_i : VectorIt(ints)) {
ASSERT_OK_AND_ASSIGN(TestInt i, maybe_i);
ASSERT_EQ(i, *ints_it++);
}
ASSERT_EQ(ints_it, ints.end()) << *ints_it << "@" << (ints_it - ints.begin());
std::vector<std::unique_ptr<TestInt>> intptrs;
for (TestInt i : ints) {
intptrs.emplace_back(new TestInt(i));
}
// also works with move only types
ints_it = ints.begin();
for (auto maybe_i_ptr : MakeVectorIterator(std::move(intptrs))) {
ASSERT_OK_AND_ASSIGN(std::unique_ptr<TestInt> i_ptr, maybe_i_ptr);
ASSERT_EQ(*i_ptr, *ints_it++);
}
ASSERT_EQ(ints_it, ints.end());
}
Transformer<TestInt, TestStr> MakeFirstN(int n) {
int remaining = n;
return [remaining](TestInt next) mutable -> Result<TransformFlow<TestStr>> {
if (remaining > 0) {
remaining--;
return TransformYield(TestStr(next));
}
return TransformFinish();
};
}
template <typename T>
Transformer<T, T> MakeFirstNGeneric(int n) {
int remaining = n;
return [remaining](T next) mutable -> Result<TransformFlow<T>> {
if (remaining > 0) {
remaining--;
return TransformYield(next);
}
return TransformFinish();
};
}
TEST(TestIteratorTransform, Truncating) {
auto original = VectorIt({1, 2, 3});
auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN(2));
AssertIteratorMatch({"1", "2"}, std::move(truncated));
}
TEST(TestIteratorTransform, TestPointer) {
auto original = VectorIt<std::shared_ptr<int>>(
{std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
auto truncated = MakeTransformedIterator(std::move(original),
MakeFirstNGeneric<std::shared_ptr<int>>(2));
ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
ASSERT_EQ(2, result.size());
}
TEST(TestIteratorTransform, TruncatingShort) {
// Tests the failsafe case where we never call Finish
auto original = VectorIt({1});
auto truncated =
MakeTransformedIterator<TestInt, TestStr>(std::move(original), MakeFirstN(2));
AssertIteratorMatch({"1"}, std::move(truncated));
}
TEST(TestIteratorTransform, SkipSome) {
// Exercises TransformSkip
auto original = VectorIt({1, 2, 3});
auto filter = MakeFilter([](TestInt& t) { return t.value != 2; });
auto filtered = MakeTransformedIterator(std::move(original), filter);
AssertIteratorMatch({"1", "3"}, std::move(filtered));
}
TEST(TestIteratorTransform, SkipAll) {
// Exercises TransformSkip
auto original = VectorIt({1, 2, 3});
auto filter = MakeFilter([](TestInt& t) { return false; });
auto filtered = MakeTransformedIterator(std::move(original), filter);
AssertIteratorMatch({}, std::move(filtered));
}
Transformer<TestInt, TestStr> MakeAbortOnSecond() {
int counter = 0;
return [counter](TestInt next) mutable -> Result<TransformFlow<TestStr>> {
if (counter++ == 1) {
return Status::Invalid("X");
}
return TransformYield(TestStr(next));
};
}
TEST(TestIteratorTransform, Abort) {
auto original = VectorIt({1, 2, 3});
auto transformed = MakeTransformedIterator(std::move(original), MakeAbortOnSecond());
ASSERT_OK(transformed.Next());
ASSERT_RAISES(Invalid, transformed.Next());
ASSERT_OK_AND_ASSIGN(auto third, transformed.Next());
ASSERT_TRUE(IsIterationEnd(third));
}
template <typename T>
Transformer<T, T> MakeRepeatN(int repeat_count) {
int current_repeat = 0;
return [repeat_count, current_repeat](T next) mutable -> Result<TransformFlow<T>> {
current_repeat++;
bool ready_for_next = false;
if (current_repeat == repeat_count) {
current_repeat = 0;
ready_for_next = true;
}
return TransformYield(next, ready_for_next);
};
}
TEST(TestIteratorTransform, Repeating) {
auto original = VectorIt({1, 2, 3});
auto repeated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
MakeRepeatN<TestInt>(2));
AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
}
TEST(TestFunctionIterator, RangeForLoop) {
int i = 0;
auto fails_at_3 = MakeFunctionIterator([&]() -> Result<TestInt> {
if (i >= 3) {
return Status::IndexError("fails at 3");
}
return i++;
});
int expected_i = 0;
for (auto maybe_i : fails_at_3) {
if (expected_i < 3) {
ASSERT_OK(maybe_i.status());
ASSERT_EQ(*maybe_i, expected_i);
} else if (expected_i == 3) {
ASSERT_RAISES(IndexError, maybe_i.status());
}
ASSERT_LE(expected_i, 3) << "iteration stops after an error is encountered";
++expected_i;
}
}
TEST(FilterIterator, Basic) {
AssertIteratorMatch({1, 2, 3, 4}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
return FilterIterator::Accept(std::move(i));
}));
AssertIteratorMatch({}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
return FilterIterator::Reject<TestInt>();
}));
AssertIteratorMatch({2, 4}, FilterIt(VectorIt({1, 2, 3, 4}), [](TestInt i) {
return i.value % 2 == 0 ? FilterIterator::Accept(std::move(i))
: FilterIterator::Reject<TestInt>();
}));
}
TEST(FlattenVectorIterator, Basic) {
// Flatten expects to consume Iterator<Iterator<T>>
AssertIteratorMatch({}, FlattenIt(EmptyIt<Iterator<TestInt>>()));
std::vector<Iterator<TestInt>> ok;
ok.push_back(VectorIt({1}));
ok.push_back(VectorIt({2}));
ok.push_back(VectorIt({3}));
AssertIteratorMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(ok))));
std::vector<Iterator<TestInt>> not_enough;
not_enough.push_back(VectorIt({1}));
not_enough.push_back(VectorIt({2}));
AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(not_enough))));
std::vector<Iterator<TestInt>> too_much;
too_much.push_back(VectorIt({1}));
too_much.push_back(VectorIt({2}));
too_much.push_back(VectorIt({3}));
too_much.push_back(VectorIt({2}));
AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(too_much))));
}
Iterator<TestInt> Join(TestInt a, TestInt b) {
std::vector<Iterator<TestInt>> joined{2};
joined[0] = VectorIt({a});
joined[1] = VectorIt({b});
return FlattenIt(VectorIt(std::move(joined)));
}
Iterator<TestInt> Join(TestInt a, Iterator<TestInt> b) {
std::vector<Iterator<TestInt>> joined{2};
joined[0] = VectorIt(std::vector<TestInt>{a});
joined[1] = std::move(b);
return FlattenIt(VectorIt(std::move(joined)));
}
TEST(FlattenVectorIterator, Pyramid) {
auto it = Join(1, Join(2, Join(2, Join(3, Join(3, 3)))));
AssertIteratorMatch({1, 2, 2, 3, 3, 3}, std::move(it));
}
TEST(ReadaheadIterator, Empty) {
ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({}), 2));
AssertIteratorMatch({}, std::move(it));
}
TEST(ReadaheadIterator, Basic) {
ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({1, 2, 3, 4, 5}), 2));
AssertIteratorMatch({1, 2, 3, 4, 5}, std::move(it));
}
TEST(ReadaheadIterator, NotExhausted) {
ASSERT_OK_AND_ASSIGN(auto it, MakeReadaheadIterator(VectorIt({1, 2, 3, 4, 5}), 2));
AssertIteratorNext({1}, it);
AssertIteratorNext({2}, it);
}
TEST(ReadaheadIterator, Trace) {
TracingIterator<TestInt> tracing_it(VectorIt({1, 2, 3, 4, 5, 6, 7, 8}));
auto tracing = tracing_it.state();
ASSERT_EQ(tracing->values().size(), 0);
ASSERT_OK_AND_ASSIGN(
auto it, MakeReadaheadIterator(Iterator<TestInt>(std::move(tracing_it)), 2));
SleepABit(); // Background iterator won't start pumping until first request comes in
ASSERT_EQ(tracing->values().size(), 0);
AssertIteratorNext({1}, it); // Once we ask for one value we should get that one value
// as well as 2 read ahead
tracing->WaitForValues(3);
tracing->AssertValuesEqual({1, 2, 3});
SleepABit(); // No further values should be fetched
tracing->AssertValuesEqual({1, 2, 3});
AssertIteratorNext({2}, it);
AssertIteratorNext({3}, it);
AssertIteratorNext({4}, it);
tracing->WaitForValues(6);
SleepABit();
tracing->AssertValuesEqual({1, 2, 3, 4, 5, 6});
AssertIteratorNext({5}, it);
AssertIteratorNext({6}, it);
AssertIteratorNext({7}, it);
tracing->WaitForValues(9);
SleepABit();
tracing->AssertValuesEqual({1, 2, 3, 4, 5, 6, 7, 8, {}});
AssertIteratorNext({8}, it);
AssertIteratorExhausted(it);
AssertIteratorExhausted(it); // Again
tracing->WaitForValues(9);
SleepABit();
tracing->AssertValuesStartwith({1, 2, 3, 4, 5, 6, 7, 8, {}});
// A couple more EOF values may have been emitted
const auto& values = tracing->values();
ASSERT_LE(values.size(), 11);
for (size_t i = 9; i < values.size(); ++i) {
ASSERT_EQ(values[i], TestInt());
}
// Values were all emitted from the same thread, and it's not this thread
const auto& thread_ids = tracing->thread_ids();
ASSERT_EQ(thread_ids.size(), 1);
ASSERT_NE(*thread_ids.begin(), std::this_thread::get_id());
}
TEST(ReadaheadIterator, NextError) {
TracingIterator<TestInt> tracing_it((VectorIt({1, 2, 3})));
auto tracing = tracing_it.state();
ASSERT_EQ(tracing->values().size(), 0);
tracing->InsertFailure(Status::IOError("xxx"));
ASSERT_OK_AND_ASSIGN(
auto it, MakeReadaheadIterator(Iterator<TestInt>(std::move(tracing_it)), 2));
ASSERT_RAISES(IOError, it.Next().status());
AssertIteratorExhausted(it);
SleepABit();
tracing->AssertValuesEqual({});
AssertIteratorExhausted(it);
}
} // namespace arrow