blob: a505df5ef04f315588e0e0b35937eab0ccf2c50b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/util/future.h"
#include "arrow/util/future_iterator.h"
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <ostream>
#include <random>
#include <string>
#include <thread>
#include <vector>
#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
using internal::ThreadPool;
int ToInt(int x) { return x; }
// A data type without a default constructor.
struct Foo {
int bar;
std::string baz;
explicit Foo(int value) : bar(value), baz(std::to_string(value)) {}
int ToInt() const { return bar; }
bool operator==(int other) const { return bar == other; }
bool operator==(const Foo& other) const { return bar == other.bar; }
friend bool operator==(int left, const Foo& right) { return right == left; }
friend std::ostream& operator<<(std::ostream& os, const Foo& foo) {
return os << "Foo(" << foo.bar << ")";
}
};
template <>
struct IterationTraits<Foo> {
static Foo End() { return Foo(-1); }
};
template <>
struct IterationTraits<MoveOnlyDataType> {
static MoveOnlyDataType End() { return MoveOnlyDataType(-1); }
};
template <typename T>
struct IteratorResults {
std::vector<T> values;
std::vector<Status> errors;
};
template <typename T>
IteratorResults<T> IteratorToResults(Iterator<T> iterator) {
IteratorResults<T> results;
while (true) {
auto res = iterator.Next();
if (res == IterationTraits<T>::End()) {
break;
}
if (res.ok()) {
results.values.push_back(*std::move(res));
} else {
results.errors.push_back(res.status());
}
}
return results;
}
// So that main thread may wait a bit for a future to be finished
constexpr auto kYieldDuration = std::chrono::microseconds(50);
constexpr double kTinyWait = 1e-5; // seconds
constexpr double kLargeWait = 5.0; // seconds
template <typename T>
class SimpleExecutor {
public:
explicit SimpleExecutor(int nfutures)
: pool_(ThreadPool::Make(/*threads=*/4).ValueOrDie()) {
for (int i = 0; i < nfutures; ++i) {
futures_.push_back(Future<T>::Make());
}
}
std::vector<Future<T>>& futures() { return futures_; }
void SetFinished(const std::vector<std::pair<int, bool>>& pairs) {
for (const auto& pair : pairs) {
const int fut_index = pair.first;
if (pair.second) {
futures_[fut_index].MarkFinished(T(fut_index));
} else {
futures_[fut_index].MarkFinished(Status::UnknownError("xxx"));
}
}
}
void SetFinishedDeferred(std::vector<std::pair<int, bool>> pairs) {
std::this_thread::sleep_for(kYieldDuration);
ABORT_NOT_OK(pool_->Spawn([=]() { SetFinished(pairs); }));
}
// Mark future successful
void SetFinished(int fut_index) { futures_[fut_index].MarkFinished(T(fut_index)); }
void SetFinishedDeferred(int fut_index) {
std::this_thread::sleep_for(kYieldDuration);
ABORT_NOT_OK(pool_->Spawn([=]() { SetFinished(fut_index); }));
}
// Mark all futures in [start, stop) successful
void SetFinished(int start, int stop) {
for (int fut_index = start; fut_index < stop; ++fut_index) {
futures_[fut_index].MarkFinished(T(fut_index));
}
}
void SetFinishedDeferred(int start, int stop) {
std::this_thread::sleep_for(kYieldDuration);
ABORT_NOT_OK(pool_->Spawn([=]() { SetFinished(start, stop); }));
}
protected:
std::vector<Future<T>> futures_;
std::shared_ptr<ThreadPool> pool_;
};
// --------------------------------------------------------------------
// Simple in-thread tests
TEST(FutureSyncTest, Int) {
{
// MarkFinished(int)
auto fut = Future<int>::Make();
AssertNotFinished(fut);
fut.MarkFinished(42);
AssertSuccessful(fut);
auto res = fut.result();
ASSERT_OK(res);
ASSERT_EQ(*res, 42);
res = std::move(fut).result();
ASSERT_OK(res);
ASSERT_EQ(*res, 42);
}
{
// MakeFinished(int)
auto fut = Future<int>::MakeFinished(42);
AssertSuccessful(fut);
auto res = fut.result();
ASSERT_OK(res);
ASSERT_EQ(*res, 42);
res = std::move(fut.result());
ASSERT_OK(res);
ASSERT_EQ(*res, 42);
}
{
// MarkFinished(Result<int>)
auto fut = Future<int>::Make();
AssertNotFinished(fut);
fut.MarkFinished(Result<int>(43));
AssertSuccessful(fut);
ASSERT_OK_AND_ASSIGN(auto value, fut.result());
ASSERT_EQ(value, 43);
}
{
// MarkFinished(failed Result<int>)
auto fut = Future<int>::Make();
AssertNotFinished(fut);
fut.MarkFinished(Result<int>(Status::IOError("xxx")));
AssertFailed(fut);
ASSERT_RAISES(IOError, fut.result());
}
{
// MakeFinished(Status)
auto fut = Future<int>::MakeFinished(Status::IOError("xxx"));
AssertFailed(fut);
ASSERT_RAISES(IOError, fut.result());
}
{
// MarkFinished(Status)
auto fut = Future<int>::Make();
AssertNotFinished(fut);
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut);
ASSERT_RAISES(IOError, fut.result());
}
}
TEST(FutureSyncTest, Foo) {
{
auto fut = Future<Foo>::Make();
AssertNotFinished(fut);
fut.MarkFinished(Foo(42));
AssertSuccessful(fut);
auto res = fut.result();
ASSERT_OK(res);
Foo value = *res;
ASSERT_EQ(value, 42);
ASSERT_OK(fut.status());
res = std::move(fut).result();
ASSERT_OK(res);
value = *res;
ASSERT_EQ(value, 42);
}
{
// MarkFinished(Result<Foo>)
auto fut = Future<Foo>::Make();
AssertNotFinished(fut);
fut.MarkFinished(Result<Foo>(Foo(42)));
AssertSuccessful(fut);
ASSERT_OK_AND_ASSIGN(Foo value, fut.result());
ASSERT_EQ(value, 42);
}
{
// MarkFinished(failed Result<Foo>)
auto fut = Future<Foo>::Make();
AssertNotFinished(fut);
fut.MarkFinished(Result<Foo>(Status::IOError("xxx")));
AssertFailed(fut);
ASSERT_RAISES(IOError, fut.result());
}
}
TEST(FutureSyncTest, Empty) {
{
// MarkFinished()
auto fut = Future<>::Make();
AssertNotFinished(fut);
fut.MarkFinished();
AssertSuccessful(fut);
}
{
// MakeFinished()
auto fut = Future<>::MakeFinished();
AssertSuccessful(fut);
auto res = fut.result();
ASSERT_OK(res);
res = std::move(fut.result());
ASSERT_OK(res);
}
{
// MarkFinished(Status)
auto fut = Future<>::Make();
AssertNotFinished(fut);
fut.MarkFinished(Status::OK());
AssertSuccessful(fut);
}
{
// MakeFinished(Status)
auto fut = Future<>::MakeFinished(Status::OK());
AssertSuccessful(fut);
fut = Future<>::MakeFinished(Status::IOError("xxx"));
AssertFailed(fut);
}
{
// MarkFinished(Status)
auto fut = Future<>::Make();
AssertNotFinished(fut);
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut);
ASSERT_RAISES(IOError, fut.status());
}
}
TEST(FutureSyncTest, GetStatusFuture) {
{
auto fut = Future<MoveOnlyDataType>::Make();
Future<> status_future(fut);
AssertNotFinished(fut);
AssertNotFinished(status_future);
fut.MarkFinished(MoveOnlyDataType(42));
AssertSuccessful(fut);
AssertSuccessful(status_future);
ASSERT_EQ(&fut.status(), &status_future.status());
}
{
auto fut = Future<MoveOnlyDataType>::Make();
Future<> status_future(fut);
AssertNotFinished(fut);
AssertNotFinished(status_future);
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut);
AssertFailed(status_future);
ASSERT_EQ(&fut.status(), &status_future.status());
}
}
// Ensure the implicit convenience constructors behave as desired.
TEST(FutureSyncTest, ImplicitConstructors) {
{
auto fut = ([]() -> Future<MoveOnlyDataType> {
return arrow::Status::Invalid("Invalid");
})();
AssertFailed(fut);
ASSERT_RAISES(Invalid, fut.result());
}
{
auto fut = ([]() -> Future<MoveOnlyDataType> {
return arrow::Result<MoveOnlyDataType>(arrow::Status::Invalid("Invalid"));
})();
AssertFailed(fut);
ASSERT_RAISES(Invalid, fut.result());
}
{
auto fut = ([]() -> Future<MoveOnlyDataType> { return MoveOnlyDataType(42); })();
AssertSuccessful(fut);
}
{
auto fut = ([]() -> Future<MoveOnlyDataType> {
return arrow::Result<MoveOnlyDataType>(MoveOnlyDataType(42));
})();
AssertSuccessful(fut);
}
}
TEST(FutureRefTest, ChainRemoved) {
// Creating a future chain should not prevent the futures from being deleted if the
// entire chain is deleted
std::weak_ptr<FutureImpl> ref;
std::weak_ptr<FutureImpl> ref2;
{
auto fut = Future<>::Make();
auto fut2 =
fut.Then([](const Result<detail::Empty>& status) { return Status::OK(); });
ref = fut.impl_;
ref2 = fut2.impl_;
}
ASSERT_TRUE(ref.expired());
ASSERT_TRUE(ref2.expired());
{
auto fut = Future<>::Make();
auto fut2 = fut.Then([](const Result<detail::Empty>&) { return Future<>::Make(); });
ref = fut.impl_;
ref2 = fut2.impl_;
}
ASSERT_TRUE(ref.expired());
ASSERT_TRUE(ref2.expired());
}
TEST(FutureRefTest, TailRemoved) {
// Keeping the head of the future chain should keep the entire chain alive
std::shared_ptr<Future<>> ref;
std::weak_ptr<FutureImpl> ref2;
bool side_effect_run = false;
{
ref = std::make_shared<Future<>>(Future<>::Make());
auto fut2 = ref->Then([&side_effect_run](const Result<detail::Empty>& status) {
side_effect_run = true;
return Status::OK();
});
ref2 = fut2.impl_;
}
ASSERT_FALSE(ref2.expired());
ref->MarkFinished();
ASSERT_TRUE(side_effect_run);
ASSERT_TRUE(ref2.expired());
}
TEST(FutureRefTest, HeadRemoved) {
// Keeping the tail of the future chain should not keep the entire chain alive. If no
// one has a reference to the head then there is no need to keep it, nothing will finish
// it. In theory the intermediate futures could be finished by some external process
// but that would be highly unusual and bad practice so in reality this would just be a
// reference to a future that will never complete which is ok.
std::weak_ptr<FutureImpl> ref;
std::shared_ptr<Future<>> ref2;
{
auto fut = std::make_shared<Future<>>(Future<>::Make());
ref = fut->impl_;
ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
}
ASSERT_TRUE(ref.expired());
{
auto fut = Future<>::Make();
ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
auto intermediate = Future<>::Make();
ref = intermediate.impl_;
return intermediate;
}));
fut.MarkFinished();
}
ASSERT_TRUE(ref.expired());
}
TEST(FutureStressTest, Callback) {
#ifdef ARROW_VALGRIND
const int NITERS = 2;
#else
const int NITERS = 1000;
#endif
for (unsigned int n = 0; n < NITERS; n++) {
auto fut = Future<>::Make();
std::atomic<unsigned int> count_finished_immediately(0);
std::atomic<unsigned int> count_finished_deferred(0);
std::atomic<unsigned int> callbacks_added(0);
std::atomic<bool> finished(false);
std::thread callback_adder([&] {
auto test_thread = std::this_thread::get_id();
while (!finished.load()) {
fut.AddCallback([&test_thread, &count_finished_immediately,
&count_finished_deferred](const Result<detail::Empty>& result) {
if (std::this_thread::get_id() == test_thread) {
count_finished_immediately++;
} else {
count_finished_deferred++;
}
});
callbacks_added++;
if (callbacks_added.load() > 10000) {
// If we've added many callbacks already and the main thread hasn't noticed yet,
// help it a bit (this seems especially useful in Valgrind).
SleepABit();
}
}
});
while (callbacks_added.load() == 0) {
// Spin until the callback_adder has started running
}
ASSERT_EQ(0, count_finished_deferred.load());
ASSERT_EQ(0, count_finished_immediately.load());
fut.MarkFinished();
while (count_finished_immediately.load() == 0) {
// Spin until the callback_adder has added at least one post-future
}
finished.store(true);
callback_adder.join();
auto total_added = callbacks_added.load();
auto total_immediate = count_finished_immediately.load();
auto total_deferred = count_finished_deferred.load();
ASSERT_EQ(total_added, total_immediate + total_deferred);
}
}
TEST(FutureStressTest, TryAddCallback) {
for (unsigned int n = 0; n < 1; n++) {
auto fut = Future<>::Make();
std::atomic<unsigned int> callbacks_added(0);
std::atomic<bool> finished(false);
std::mutex mutex;
std::condition_variable cv;
std::thread::id callback_adder_thread_id;
std::thread callback_adder([&] {
callback_adder_thread_id = std::this_thread::get_id();
std::function<void(const Result<detail::Empty>&)> callback =
[&callback_adder_thread_id](const Result<detail::Empty>&) {
if (std::this_thread::get_id() == callback_adder_thread_id) {
FAIL() << "TryAddCallback allowed a callback to be run synchronously";
}
};
std::function<std::function<void(const Result<detail::Empty>&)>()>
callback_factory = [&callback]() { return callback; };
while (true) {
auto callback_added = fut.TryAddCallback(callback_factory);
if (callback_added) {
callbacks_added++;
if (callbacks_added.load() > 10000) {
// If we've added many callbacks already and the main thread hasn't
// noticed yet, help it a bit (this seems especially useful in Valgrind).
SleepABit();
}
} else {
break;
}
}
{
std::lock_guard<std::mutex> lg(mutex);
finished.store(true);
}
cv.notify_one();
});
while (callbacks_added.load() == 0) {
// Spin until the callback_adder has started running
}
fut.MarkFinished();
std::unique_lock<std::mutex> lk(mutex);
cv.wait_for(lk, std::chrono::duration<double>(0.5),
[&finished] { return finished.load(); });
lk.unlock();
ASSERT_TRUE(finished);
callback_adder.join();
}
}
TEST(FutureCompletionTest, Void) {
{
// Simple callback
auto fut = Future<int>::Make();
int passed_in_result = 0;
auto fut2 =
fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
fut.MarkFinished(42);
AssertSuccessful(fut2);
ASSERT_EQ(passed_in_result, 42);
}
{
// Propagate failure by returning it from on_failure
auto fut = Future<int>::Make();
auto fut2 = fut.Then([](...) {}, [](const Status& s) { return s; });
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_TRUE(fut2.status().IsIOError());
}
{
// From void
auto fut = Future<>::Make();
auto fut2 = fut.Then([](const Result<detail::Empty>&) {});
fut.MarkFinished();
AssertSuccessful(fut2);
}
{
// Propagate failure by not having on_failure
auto fut = Future<>::Make();
auto cb_was_run = false;
auto fut2 = fut.Then([&cb_was_run](const Result<detail::Empty>& res) {
cb_was_run = true;
return res;
});
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_FALSE(cb_was_run);
}
{
// Swallow failure by catching in on_failure
auto fut = Future<>::Make();
Status status_seen = Status::OK();
auto fut2 = fut.Then([](...) {},
[&status_seen](const Status& s) {
status_seen = s;
return Status::OK();
});
ASSERT_TRUE(status_seen.ok());
fut.MarkFinished(Status::IOError("xxx"));
ASSERT_TRUE(status_seen.IsIOError());
AssertSuccessful(fut2);
}
}
TEST(FutureCompletionTest, NonVoid) {
{
// Simple callback
auto fut = Future<int>::Make();
auto fut2 = fut.Then([](int result) {
auto passed_in_result = result;
return passed_in_result * passed_in_result;
});
fut.MarkFinished(42);
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, 42 * 42);
}
{
// Propagate failure by not having on_failure
auto fut = Future<int>::Make();
auto cb_was_run = false;
auto fut2 = fut.Then([&cb_was_run](int result) {
cb_was_run = true;
auto passed_in_result = result;
return passed_in_result * passed_in_result;
});
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_TRUE(fut2.status().IsIOError());
ASSERT_FALSE(cb_was_run);
}
{
// Swallow failure by catching in on_failure
auto fut = Future<int>::Make();
bool was_io_error = false;
auto fut2 = fut.Then([](int) { return 99; },
[&was_io_error](const Status& s) {
was_io_error = s.IsIOError();
return 100;
});
fut.MarkFinished(Status::IOError("xxx"));
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, 100);
ASSERT_TRUE(was_io_error);
}
{
// From void
auto fut = Future<>::Make();
auto fut2 = fut.Then([](...) { return 42; });
fut.MarkFinished();
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, 42);
}
{
// Propagate failure by returning failure
// Cannot do this. Must return Result<int> because
// both callbacks must return the same thing and you can't
// return an int from the second callback if you're trying
// to propagate a failure
}
}
TEST(FutureCompletionTest, FutureNonVoid) {
{
// Simple callback
auto fut = Future<int>::Make();
auto innerFut = Future<std::string>::Make();
int passed_in_result = 0;
auto fut2 = fut.Then([&passed_in_result, innerFut](int result) {
passed_in_result = result;
return innerFut;
});
fut.MarkFinished(42);
ASSERT_EQ(passed_in_result, 42);
AssertNotFinished(fut2);
innerFut.MarkFinished("hello");
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, "hello");
}
{
// Propagate failure by not having on_failure
auto fut = Future<int>::Make();
auto innerFut = Future<std::string>::Make();
auto was_cb_run = false;
auto fut2 = fut.Then([innerFut, &was_cb_run](int) {
was_cb_run = true;
return innerFut;
});
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_TRUE(fut2.status().IsIOError());
ASSERT_FALSE(was_cb_run);
}
{
// Swallow failure by catching in on_failure
auto fut = Future<int>::Make();
auto innerFut = Future<std::string>::Make();
bool was_io_error = false;
auto was_cb_run = false;
auto fut2 = fut.Then(
[innerFut, &was_cb_run](int) {
was_cb_run = true;
return innerFut;
},
[&was_io_error, innerFut](const Status& s) {
was_io_error = s.IsIOError();
return innerFut;
});
fut.MarkFinished(Status::IOError("xxx"));
AssertNotFinished(fut2);
innerFut.MarkFinished("hello");
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, "hello");
ASSERT_TRUE(was_io_error);
ASSERT_FALSE(was_cb_run);
}
{
// From void
auto fut = Future<>::Make();
auto innerFut = Future<std::string>::Make();
auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
fut.MarkFinished();
AssertNotFinished(fut2);
innerFut.MarkFinished("hello");
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, "hello");
}
{
// Propagate failure by returning failure
auto fut = Future<>::Make();
auto innerFut = Future<std::string>::Make();
auto was_cb_run = false;
auto fut2 = fut.Then(
[&innerFut, &was_cb_run](...) {
was_cb_run = true;
return Result<Future<std::string>>(innerFut);
},
[](const Status& status) { return Result<Future<std::string>>(status); });
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_FALSE(was_cb_run);
}
}
TEST(FutureCompletionTest, Status) {
{
// Simple callback
auto fut = Future<int>::Make();
int passed_in_result = 0;
Future<> fut2 = fut.Then([&passed_in_result](int result) {
passed_in_result = result;
return Status::OK();
});
fut.MarkFinished(42);
ASSERT_EQ(passed_in_result, 42);
AssertSuccessful(fut2);
}
{
// Propagate failure by not having on_failure
auto fut = Future<int>::Make();
auto was_cb_run = false;
auto fut2 = fut.Then([&was_cb_run](int) {
was_cb_run = true;
return Status::OK();
});
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_TRUE(fut2.status().IsIOError());
ASSERT_FALSE(was_cb_run);
}
{
// Swallow failure by catching in on_failure
auto fut = Future<int>::Make();
bool was_io_error = false;
auto was_cb_run = false;
auto fut2 = fut.Then(
[&was_cb_run](int i) {
was_cb_run = true;
return Status::OK();
},
[&was_io_error](const Status& s) {
was_io_error = s.IsIOError();
return Status::OK();
});
fut.MarkFinished(Status::IOError("xxx"));
AssertSuccessful(fut2);
ASSERT_TRUE(was_io_error);
ASSERT_FALSE(was_cb_run);
}
{
// From void
auto fut = Future<>::Make();
auto fut2 = fut.Then([](const Result<detail::Empty>& res) { return Status::OK(); });
fut.MarkFinished();
AssertSuccessful(fut2);
}
{
// Propagate failure by returning failure
auto fut = Future<>::Make();
auto was_cb_run = false;
auto fut2 = fut.Then(
[&was_cb_run](const Result<detail::Empty>& res) {
was_cb_run = true;
return Status::OK();
},
[](const Status& s) { return s; });
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_FALSE(was_cb_run);
}
}
TEST(FutureCompletionTest, Result) {
{
// Simple callback
auto fut = Future<int>::Make();
Future<int> fut2 = fut.Then([](const int& i) {
auto passed_in_result = i;
return Result<int>(passed_in_result * passed_in_result);
});
fut.MarkFinished(42);
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, 42 * 42);
}
{
// Propagate failure by not having on_failure
auto fut = Future<int>::Make();
auto was_cb_run = false;
auto fut2 = fut.Then([&was_cb_run](const int& i) {
was_cb_run = true;
auto passed_in_result = i;
return Result<int>(passed_in_result * passed_in_result);
});
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_TRUE(fut2.status().IsIOError());
ASSERT_FALSE(was_cb_run);
}
{
// Swallow failure by catching in on_failure
auto fut = Future<int>::Make();
bool was_io_error = false;
bool was_cb_run = false;
auto fut2 = fut.Then(
[&was_cb_run](const int& i) {
was_cb_run = true;
return Result<int>(100);
},
[&was_io_error](const Status& s) {
was_io_error = s.IsIOError();
return Result<int>(100);
});
fut.MarkFinished(Status::IOError("xxx"));
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, 100);
ASSERT_TRUE(was_io_error);
ASSERT_FALSE(was_cb_run);
}
{
// From void
auto fut = Future<>::Make();
auto fut2 = fut.Then([](...) { return Result<int>(42); });
fut.MarkFinished();
AssertSuccessful(fut2);
auto result = *fut2.result();
ASSERT_EQ(result, 42);
}
{
// Propagate failure by returning failure
auto fut = Future<>::Make();
auto was_cb_run = false;
auto fut2 = fut.Then(
[&was_cb_run](...) {
was_cb_run = true;
return Result<int>(42);
},
[](const Status& s) { return Result<int>(s); });
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
ASSERT_FALSE(was_cb_run);
}
}
TEST(FutureCompletionTest, FutureVoid) {
{
// Simple callback
auto fut = Future<int>::Make();
auto innerFut = Future<>::Make();
int passed_in_result = 0;
auto fut2 = fut.Then([&passed_in_result, innerFut](int i) {
passed_in_result = i;
return innerFut;
});
fut.MarkFinished(42);
AssertNotFinished(fut2);
innerFut.MarkFinished();
AssertSuccessful(fut2);
auto res = fut2.status();
ASSERT_OK(res);
ASSERT_EQ(passed_in_result, 42);
}
{
// Precompleted future
auto fut = Future<int>::Make();
auto innerFut = Future<>::Make();
innerFut.MarkFinished();
int passed_in_result = 0;
auto fut2 = fut.Then([&passed_in_result, innerFut](int i) {
passed_in_result = i;
return innerFut;
});
AssertNotFinished(fut2);
fut.MarkFinished(42);
AssertSuccessful(fut2);
ASSERT_EQ(passed_in_result, 42);
}
{
// Propagate failure by not having on_failure
auto fut = Future<int>::Make();
auto innerFut = Future<>::Make();
auto was_cb_run = false;
auto fut2 = fut.Then([innerFut, &was_cb_run](int) {
was_cb_run = true;
return innerFut;
});
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
if (IsFutureFinished(fut2.state())) {
ASSERT_TRUE(fut2.status().IsIOError());
}
ASSERT_FALSE(was_cb_run);
}
{
// Swallow failure by catching in on_failure
auto fut = Future<int>::Make();
auto innerFut = Future<>::Make();
auto was_cb_run = false;
auto fut2 = fut.Then(
[innerFut, &was_cb_run](int) {
was_cb_run = true;
return innerFut;
},
[innerFut](const Status& s) { return innerFut; });
fut.MarkFinished(Status::IOError("xxx"));
AssertNotFinished(fut2);
innerFut.MarkFinished();
AssertSuccessful(fut2);
ASSERT_FALSE(was_cb_run);
}
{
// From void
auto fut = Future<>::Make();
auto innerFut = Future<>::Make();
auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
fut.MarkFinished();
AssertNotFinished(fut2);
innerFut.MarkFinished();
AssertSuccessful(fut2);
}
{
// Propagate failure by returning failure
auto fut = Future<>::Make();
auto innerFut = Future<>::Make();
auto fut2 = fut.Then([&innerFut](...) { return innerFut; },
[](const Status& s) { return Future<>::MakeFinished(s); });
fut.MarkFinished(Status::IOError("xxx"));
AssertFailed(fut2);
}
}
TEST(FutureAllTest, Empty) {
auto combined = arrow::All(std::vector<Future<int>>{});
auto after_assert = combined.Then(
[](std::vector<Result<int>> results) { ASSERT_EQ(0, results.size()); });
AssertSuccessful(after_assert);
}
TEST(FutureAllTest, Simple) {
auto f1 = Future<int>::Make();
auto f2 = Future<int>::Make();
std::vector<Future<int>> futures = {f1, f2};
auto combined = arrow::All(futures);
auto after_assert = combined.Then([](std::vector<Result<int>> results) {
ASSERT_EQ(2, results.size());
ASSERT_EQ(1, *results[0]);
ASSERT_EQ(2, *results[1]);
});
// Finish in reverse order, results should still be delivered in proper order
AssertNotFinished(after_assert);
f2.MarkFinished(2);
AssertNotFinished(after_assert);
f1.MarkFinished(1);
AssertSuccessful(after_assert);
}
TEST(FutureAllTest, Failure) {
auto f1 = Future<int>::Make();
auto f2 = Future<int>::Make();
auto f3 = Future<int>::Make();
std::vector<Future<int>> futures = {f1, f2, f3};
auto combined = arrow::All(futures);
auto after_assert = combined.Then([](std::vector<Result<int>> results) {
ASSERT_EQ(3, results.size());
ASSERT_EQ(1, *results[0]);
ASSERT_EQ(Status::IOError("XYZ"), results[1].status());
ASSERT_EQ(3, *results[2]);
});
f1.MarkFinished(1);
f2.MarkFinished(Status::IOError("XYZ"));
f3.MarkFinished(3);
AssertFinished(after_assert);
}
TEST(FutureAllCompleteTest, Empty) {
Future<> combined = AllComplete(std::vector<Future<>>{});
AssertSuccessful(combined);
}
TEST(FutureAllCompleteTest, Simple) {
auto f1 = Future<int>::Make();
auto f2 = Future<int>::Make();
std::vector<Future<>> futures = {Future<>(f1), Future<>(f2)};
auto combined = AllComplete(futures);
AssertNotFinished(combined);
f2.MarkFinished(2);
AssertNotFinished(combined);
f1.MarkFinished(1);
AssertSuccessful(combined);
}
TEST(FutureAllCompleteTest, Failure) {
auto f1 = Future<int>::Make();
auto f2 = Future<int>::Make();
auto f3 = Future<int>::Make();
std::vector<Future<>> futures = {Future<>(f1), Future<>(f2), Future<>(f3)};
auto combined = AllComplete(futures);
AssertNotFinished(combined);
f1.MarkFinished(1);
AssertNotFinished(combined);
f2.MarkFinished(Status::IOError("XYZ"));
AssertFinished(combined);
f3.MarkFinished(3);
AssertFinished(combined);
ASSERT_EQ(Status::IOError("XYZ"), combined.status());
}
TEST(FutureLoopTest, Sync) {
struct {
int i = 0;
Future<int> Get() { return Future<int>::MakeFinished(i++); }
} IntSource;
bool do_fail = false;
std::vector<int> ints;
auto loop_body = [&] {
return IntSource.Get().Then([&](int i) -> Result<ControlFlow<int>> {
if (do_fail && i == 3) {
return Status::IOError("xxx");
}
if (i == 5) {
int sum = 0;
for (int i : ints) sum += i;
return Break(sum);
}
ints.push_back(i);
return Continue();
});
};
{
auto sum_fut = Loop(loop_body);
AssertSuccessful(sum_fut);
ASSERT_OK_AND_ASSIGN(auto sum, sum_fut.result());
ASSERT_EQ(sum, 0 + 1 + 2 + 3 + 4);
}
{
do_fail = true;
IntSource.i = 0;
auto sum_fut = Loop(loop_body);
AssertFailed(sum_fut);
ASSERT_RAISES(IOError, sum_fut.result());
}
}
TEST(FutureLoopTest, EmptyBreakValue) {
Future<> none_fut =
Loop([&] { return Future<>::MakeFinished().Then([&](...) { return Break(); }); });
AssertSuccessful(none_fut);
}
TEST(FutureLoopTest, EmptyLoop) {
auto loop_body = []() -> Future<ControlFlow<int>> {
return Future<ControlFlow<int>>::MakeFinished(Break(0));
};
auto loop_fut = Loop(loop_body);
ASSERT_FINISHES_OK_AND_ASSIGN(auto loop_res, loop_fut);
ASSERT_EQ(loop_res, 0);
}
// TODO - Test provided by Ben but I don't understand how it can pass legitimately.
// Any future result will be passed by reference to the callbacks (as there can be
// multiple callbacks). In the Loop construct it takes the break and forwards it
// on to the outer future. Since there is no way to move a reference this can only
// be done by copying.
//
// In theory it should be safe since Loop is guaranteed to be the last callback added
// to the control future and so the value can be safely moved at that point. However,
// I'm unable to reproduce whatever trick you had in ControlFlow to make this work.
// If we want to formalize this "last callback can steal" concept then we could add
// a "last callback" to Future which gets called with an rvalue instead of an lvalue
// reference but that seems overly complicated.
//
// Ben, can you recreate whatever trick you had in place before that allowed this to
// pass? Perhaps some kind of cast. Worst case, I can move back to using
// ControlFlow instead of std::optional
//
// TEST(FutureLoopTest, MoveOnlyBreakValue) {
// Future<MoveOnlyDataType> one_fut = Loop([&] {
// return Future<int>::MakeFinished(1).Then(
// [&](int i) { return Break(MoveOnlyDataType(i)); });
// });
// AssertSuccessful(one_fut);
// ASSERT_OK_AND_ASSIGN(auto one, std::move(one_fut).result());
// ASSERT_EQ(one, 1);
// }
TEST(FutureLoopTest, StackOverflow) {
// Looping over futures is normally a rather recursive task. If the futures complete
// synchronously (because they are already finished) it could lead to a stack overflow
// if care is not taken.
int counter = 0;
auto loop_body = [&counter]() -> Future<ControlFlow<int>> {
while (counter < 1000000) {
counter++;
return Future<ControlFlow<int>>::MakeFinished(Continue());
}
return Future<ControlFlow<int>>::MakeFinished(Break(-1));
};
auto loop_fut = Loop(loop_body);
ASSERT_TRUE(loop_fut.Wait(0.1));
}
TEST(FutureLoopTest, AllowsBreakFutToBeDiscarded) {
int counter = 0;
auto loop_body = [&counter]() -> Future<ControlFlow<int>> {
while (counter < 10) {
counter++;
return Future<ControlFlow<int>>::MakeFinished(Continue());
}
return Future<ControlFlow<int>>::MakeFinished(Break(-1));
};
auto loop_fut = Loop(loop_body).Then([](...) { return Status::OK(); });
ASSERT_TRUE(loop_fut.Wait(0.1));
}
class MoveTrackingCallable {
public:
MoveTrackingCallable() {
// std::cout << "CONSTRUCT" << std::endl;
}
~MoveTrackingCallable() {
valid_ = false;
// std::cout << "DESTRUCT" << std::endl;
}
MoveTrackingCallable(const MoveTrackingCallable& other) {
// std::cout << "COPY CONSTRUCT" << std::endl;
}
MoveTrackingCallable(MoveTrackingCallable&& other) {
other.valid_ = false;
// std::cout << "MOVE CONSTRUCT" << std::endl;
}
MoveTrackingCallable& operator=(const MoveTrackingCallable& other) {
// std::cout << "COPY ASSIGN" << std::endl;
return *this;
}
MoveTrackingCallable& operator=(MoveTrackingCallable&& other) {
other.valid_ = false;
// std::cout << "MOVE ASSIGN" << std::endl;
return *this;
}
Status operator()(...) {
// std::cout << "TRIGGER" << std::endl;
if (valid_) {
return Status::OK();
} else {
return Status::Invalid("Invalid callback triggered");
}
}
private:
bool valid_ = true;
};
TEST(FutureCompletionTest, ReuseCallback) {
auto fut = Future<>::Make();
Future<> continuation;
{
MoveTrackingCallable callback;
continuation = fut.Then(callback);
}
fut.MarkFinished(Status::OK());
ASSERT_TRUE(continuation.is_finished());
if (continuation.is_finished()) {
ASSERT_OK(continuation.status());
}
}
// --------------------------------------------------------------------
// Tests with an executor
template <typename T>
class FutureTestBase : public ::testing::Test {
public:
using ExecutorType = SimpleExecutor<T>;
void MakeExecutor(int nfutures) { executor_.reset(new ExecutorType(nfutures)); }
void MakeExecutor(int nfutures, std::vector<std::pair<int, bool>> immediate) {
MakeExecutor(nfutures);
executor_->SetFinished(std::move(immediate));
}
template <typename U>
void RandomShuffle(std::vector<U>* values) {
std::default_random_engine gen(seed_++);
std::shuffle(values->begin(), values->end(), gen);
}
// Generate a sequence of randomly-sized ordered spans covering exactly [0, size).
// Returns a vector of (start, stop) pairs.
std::vector<std::pair<int, int>> RandomSequenceSpans(int size) {
std::default_random_engine gen(seed_++);
// The distribution of span sizes
std::poisson_distribution<int> dist(5);
std::vector<std::pair<int, int>> spans;
int start = 0;
while (start < size) {
int stop = std::min(start + dist(gen), size);
spans.emplace_back(start, stop);
start = stop;
}
return spans;
}
void AssertAllNotFinished(const std::vector<int>& future_indices) {
const auto& futures = executor_->futures();
for (const auto fut_index : future_indices) {
AssertNotFinished(futures[fut_index]);
}
}
// Assert the given futures are *eventually* successful
void AssertAllSuccessful(const std::vector<int>& future_indices) {
const auto& futures = executor_->futures();
for (const auto fut_index : future_indices) {
ASSERT_OK(futures[fut_index].status());
ASSERT_EQ(*futures[fut_index].result(), fut_index);
}
}
// Assert the given futures are *eventually* failed
void AssertAllFailed(const std::vector<int>& future_indices) {
const auto& futures = executor_->futures();
for (const auto fut_index : future_indices) {
ASSERT_RAISES(UnknownError, futures[fut_index].status());
}
}
// Assert the given futures are *eventually* successful
void AssertSpanSuccessful(int start, int stop) {
const auto& futures = executor_->futures();
for (int fut_index = start; fut_index < stop; ++fut_index) {
ASSERT_OK(futures[fut_index].status());
ASSERT_EQ(*futures[fut_index].result(), fut_index);
}
}
void AssertAllSuccessful() {
AssertSpanSuccessful(0, static_cast<int>(executor_->futures().size()));
}
// Assert the given futures are successful *now*
void AssertSpanSuccessfulNow(int start, int stop) {
const auto& futures = executor_->futures();
for (int fut_index = start; fut_index < stop; ++fut_index) {
ASSERT_TRUE(IsFutureFinished(futures[fut_index].state()));
}
}
void AssertAllSuccessfulNow() {
AssertSpanSuccessfulNow(0, static_cast<int>(executor_->futures().size()));
}
void TestBasicWait() {
MakeExecutor(4, {{1, true}, {2, false}});
AssertAllNotFinished({0, 3});
AssertAllSuccessful({1});
AssertAllFailed({2});
AssertAllNotFinished({0, 3});
executor_->SetFinishedDeferred({{0, true}, {3, true}});
AssertAllSuccessful({0, 1, 3});
}
void TestTimedWait() {
MakeExecutor(2);
const auto& futures = executor_->futures();
ASSERT_FALSE(futures[0].Wait(kTinyWait));
ASSERT_FALSE(futures[1].Wait(kTinyWait));
AssertAllNotFinished({0, 1});
executor_->SetFinishedDeferred({{0, true}, {1, true}});
ASSERT_TRUE(futures[0].Wait(kLargeWait));
ASSERT_TRUE(futures[1].Wait(kLargeWait));
AssertAllSuccessfulNow();
}
void TestStressWait() {
#ifdef ARROW_VALGRIND
const int N = 20;
#else
const int N = 2000;
#endif
MakeExecutor(N);
const auto& futures = executor_->futures();
const auto spans = RandomSequenceSpans(N);
for (const auto& span : spans) {
int start = span.first, stop = span.second;
executor_->SetFinishedDeferred(start, stop);
AssertSpanSuccessful(start, stop);
if (stop < N) {
AssertNotFinished(futures[stop]);
}
}
AssertAllSuccessful();
}
void TestBasicWaitForAny() {
MakeExecutor(4, {{1, true}, {2, false}});
auto& futures = executor_->futures();
std::vector<Future<T>*> wait_on = {&futures[0], &futures[1]};
auto finished = WaitForAny(wait_on);
ASSERT_THAT(finished, testing::ElementsAre(1));
wait_on = {&futures[1], &futures[2], &futures[3]};
while (finished.size() < 2) {
finished = WaitForAny(wait_on);
}
ASSERT_THAT(finished, testing::UnorderedElementsAre(0, 1));
executor_->SetFinished(3);
finished = WaitForAny(futures);
ASSERT_THAT(finished, testing::UnorderedElementsAre(1, 2, 3));
executor_->SetFinishedDeferred(0);
// Busy wait until the state change is done
while (finished.size() < 4) {
finished = WaitForAny(futures);
}
ASSERT_THAT(finished, testing::UnorderedElementsAre(0, 1, 2, 3));
}
void TestTimedWaitForAny() {
MakeExecutor(4, {{1, true}, {2, false}});
auto& futures = executor_->futures();
std::vector<int> finished;
std::vector<Future<T>*> wait_on = {&futures[0], &futures[3]};
finished = WaitForAny(wait_on, kTinyWait);
ASSERT_EQ(finished.size(), 0);
executor_->SetFinished(3);
finished = WaitForAny(wait_on, kLargeWait);
ASSERT_THAT(finished, testing::ElementsAre(1));
executor_->SetFinished(0);
while (finished.size() < 2) {
finished = WaitForAny(wait_on, kTinyWait);
}
ASSERT_THAT(finished, testing::UnorderedElementsAre(0, 1));
while (finished.size() < 4) {
finished = WaitForAny(futures, kTinyWait);
}
ASSERT_THAT(finished, testing::UnorderedElementsAre(0, 1, 2, 3));
}
void TestBasicWaitForAll() {
MakeExecutor(4, {{1, true}, {2, false}});
auto& futures = executor_->futures();
std::vector<Future<T>*> wait_on = {&futures[1], &futures[2]};
WaitForAll(wait_on);
AssertSpanSuccessfulNow(1, 3);
executor_->SetFinishedDeferred({{0, true}, {3, false}});
WaitForAll(futures);
AssertAllSuccessfulNow();
WaitForAll(futures);
}
void TestTimedWaitForAll() {
MakeExecutor(4, {{1, true}, {2, false}});
auto& futures = executor_->futures();
ASSERT_FALSE(WaitForAll(futures, kTinyWait));
executor_->SetFinishedDeferred({{0, true}, {3, false}});
ASSERT_TRUE(WaitForAll(futures, kLargeWait));
AssertAllSuccessfulNow();
}
void TestStressWaitForAny() {
#ifdef ARROW_VALGRIND
const int N = 5;
#else
const int N = 300;
#endif
MakeExecutor(N);
const auto& futures = executor_->futures();
const auto spans = RandomSequenceSpans(N);
std::vector<int> finished;
// Note this loop is potentially O(N**2), because we're copying
// O(N)-sized vector when waiting.
for (const auto& span : spans) {
int start = span.first, stop = span.second;
executor_->SetFinishedDeferred(start, stop);
size_t last_finished_size = finished.size();
finished = WaitForAny(futures);
ASSERT_GE(finished.size(), last_finished_size);
// The spans are contiguous and ordered, so `stop` is also the number
// of futures for which SetFinishedDeferred() was called.
ASSERT_LE(finished.size(), static_cast<size_t>(stop));
}
// Semi-busy wait for all futures to be finished
while (finished.size() < static_cast<size_t>(N)) {
finished = WaitForAny(futures);
}
AssertAllSuccessfulNow();
}
void TestStressWaitForAll() {
#ifdef ARROW_VALGRIND
const int N = 5;
#else
const int N = 300;
#endif
MakeExecutor(N);
const auto& futures = executor_->futures();
const auto spans = RandomSequenceSpans(N);
// Note this loop is potentially O(N**2), because we're copying
// O(N)-sized vector when waiting.
for (const auto& span : spans) {
int start = span.first, stop = span.second;
executor_->SetFinishedDeferred(start, stop);
bool finished = WaitForAll(futures, kTinyWait);
if (stop < N) {
ASSERT_FALSE(finished);
}
}
ASSERT_TRUE(WaitForAll(futures, kLargeWait));
AssertAllSuccessfulNow();
}
void TestBasicAsCompleted() {
{
MakeExecutor(4, {{1, true}, {2, true}});
executor_->SetFinishedDeferred({{0, true}, {3, true}});
auto it = MakeAsCompletedIterator(executor_->futures());
std::vector<T> values = IteratorToVector(std::move(it));
ASSERT_THAT(values, testing::UnorderedElementsAre(0, 1, 2, 3));
}
{
// Check that AsCompleted is opportunistic, it yields elements in order
// of completion.
MakeExecutor(4, {{2, true}});
auto it = MakeAsCompletedIterator(executor_->futures());
ASSERT_OK_AND_EQ(2, it.Next());
executor_->SetFinishedDeferred({{3, true}});
ASSERT_OK_AND_EQ(3, it.Next());
executor_->SetFinishedDeferred({{0, true}});
ASSERT_OK_AND_EQ(0, it.Next());
executor_->SetFinishedDeferred({{1, true}});
ASSERT_OK_AND_EQ(1, it.Next());
ASSERT_OK_AND_EQ(IterationTraits<T>::End(), it.Next());
ASSERT_OK_AND_EQ(IterationTraits<T>::End(), it.Next()); // idempotent
}
}
void TestErrorsAsCompleted() {
MakeExecutor(4, {{1, true}, {2, false}});
executor_->SetFinishedDeferred({{0, true}, {3, false}});
auto it = MakeAsCompletedIterator(executor_->futures());
auto results = IteratorToResults(std::move(it));
ASSERT_THAT(results.values, testing::UnorderedElementsAre(0, 1));
ASSERT_EQ(results.errors.size(), 2);
ASSERT_RAISES(UnknownError, results.errors[0]);
ASSERT_RAISES(UnknownError, results.errors[1]);
}
void TestStressAsCompleted() {
#ifdef ARROW_VALGRIND
const int N = 10;
#else
const int N = 1000;
#endif
MakeExecutor(N);
// Launch a worker thread that will finish random spans of futures,
// in random order.
auto spans = RandomSequenceSpans(N);
RandomShuffle(&spans);
auto feed_iterator = [&]() {
for (const auto& span : spans) {
int start = span.first, stop = span.second;
executor_->SetFinishedDeferred(start, stop); // will sleep a bit
}
};
auto worker = std::thread(std::move(feed_iterator));
auto it = MakeAsCompletedIterator(executor_->futures());
auto results = IteratorToResults(std::move(it));
worker.join();
ASSERT_EQ(results.values.size(), static_cast<size_t>(N));
ASSERT_EQ(results.errors.size(), 0);
std::vector<int> expected(N);
std::iota(expected.begin(), expected.end(), 0);
std::vector<int> actual(N);
std::transform(results.values.begin(), results.values.end(), actual.begin(),
[](const T& value) { return value.ToInt(); });
std::sort(actual.begin(), actual.end());
ASSERT_EQ(expected, actual);
}
protected:
std::unique_ptr<ExecutorType> executor_;
int seed_ = 42;
};
template <typename T>
class FutureWaitTest : public FutureTestBase<T> {};
using FutureWaitTestTypes = ::testing::Types<int, Foo, MoveOnlyDataType>;
TYPED_TEST_SUITE(FutureWaitTest, FutureWaitTestTypes);
TYPED_TEST(FutureWaitTest, BasicWait) { this->TestBasicWait(); }
TYPED_TEST(FutureWaitTest, TimedWait) { this->TestTimedWait(); }
TYPED_TEST(FutureWaitTest, StressWait) { this->TestStressWait(); }
TYPED_TEST(FutureWaitTest, BasicWaitForAny) { this->TestBasicWaitForAny(); }
TYPED_TEST(FutureWaitTest, TimedWaitForAny) { this->TestTimedWaitForAny(); }
TYPED_TEST(FutureWaitTest, StressWaitForAny) { this->TestStressWaitForAny(); }
TYPED_TEST(FutureWaitTest, BasicWaitForAll) { this->TestBasicWaitForAll(); }
TYPED_TEST(FutureWaitTest, TimedWaitForAll) { this->TestTimedWaitForAll(); }
TYPED_TEST(FutureWaitTest, StressWaitForAll) { this->TestStressWaitForAll(); }
template <typename T>
class FutureIteratorTest : public FutureTestBase<T> {};
using FutureIteratorTestTypes = ::testing::Types<Foo>;
TYPED_TEST_SUITE(FutureIteratorTest, FutureIteratorTestTypes);
TYPED_TEST(FutureIteratorTest, BasicAsCompleted) { this->TestBasicAsCompleted(); }
TYPED_TEST(FutureIteratorTest, ErrorsAsCompleted) { this->TestErrorsAsCompleted(); }
TYPED_TEST(FutureIteratorTest, StressAsCompleted) { this->TestStressAsCompleted(); }
namespace internal {
TEST(FnOnceTest, MoveOnlyDataType) {
// ensuring this is valid guarantees we are making no unnecessary copies
FnOnce<int(const MoveOnlyDataType&, MoveOnlyDataType, std::string)> fn =
[](const MoveOnlyDataType& i0, MoveOnlyDataType i1, std::string copyable) {
return *i0.data + *i1.data + (i0.moves * 1000) + (i1.moves * 100);
};
using arg0 = call_traits::argument_type<0, decltype(fn)>;
using arg1 = call_traits::argument_type<1, decltype(fn)>;
using arg2 = call_traits::argument_type<2, decltype(fn)>;
static_assert(std::is_same<arg0, const MoveOnlyDataType&>::value, "");
static_assert(std::is_same<arg1, MoveOnlyDataType>::value, "");
static_assert(std::is_same<arg2, std::string>::value,
"should not add a && to the call type (demanding rvalue unnecessarily)");
MoveOnlyDataType i0{1}, i1{41};
std::string copyable = "";
ASSERT_EQ(std::move(fn)(i0, std::move(i1), copyable), 242);
ASSERT_EQ(i0.moves, 0);
ASSERT_EQ(i1.moves, 0);
}
} // namespace internal
} // namespace arrow