ARROW-11924: [C++] Add streaming version of FileSystem::GetFileInfo
Closes #9995 from pitrou/ARROW-11924-get-file-info-generator
Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc
index 7cfe266..98dc057 100644
--- a/cpp/src/arrow/filesystem/filesystem.cc
+++ b/cpp/src/arrow/filesystem/filesystem.cc
@@ -32,19 +32,24 @@
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/io/slow.h"
+#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/parallel.h"
#include "arrow/util/uri.h"
+#include "arrow/util/vector.h"
#include "arrow/util/windows_fixup.h"
namespace arrow {
+using internal::checked_pointer_cast;
using internal::TaskHints;
using internal::Uri;
+using io::internal::SubmitIO;
namespace fs {
@@ -143,11 +148,8 @@
if (synchronous) {
return std::forward<DeferredFunc>(func)(std::move(self));
}
- TaskHints hints;
- hints.external_id = fs->io_context().external_id();
- // TODO pass StopToken
- return DeferNotOk(fs->io_context().executor()->Submit(
- hints, std::forward<DeferredFunc>(func), std::move(self)));
+ return DeferNotOk(io::internal::SubmitIO(
+ fs->io_context(), std::forward<DeferredFunc>(func), std::move(self)));
}
} // namespace
@@ -159,10 +161,11 @@
[paths](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(paths); });
}
-Future<std::vector<FileInfo>> FileSystem::GetFileInfoAsync(const FileSelector& select) {
- return FileSystemDefer(
+FileInfoGenerator FileSystem::GetFileInfoGenerator(const FileSelector& select) {
+ auto fut = FileSystemDefer(
this, default_async_is_sync_,
[select](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(select); });
+ return MakeSingleFutureGenerator(std::move(fut));
}
Status FileSystem::DeleteFiles(const std::vector<std::string>& paths) {
@@ -312,6 +315,23 @@
return infos;
}
+FileInfoGenerator SubTreeFileSystem::GetFileInfoGenerator(const FileSelector& select) {
+ auto selector = select;
+ selector.base_dir = PrependBase(selector.base_dir);
+ auto gen = base_fs_->GetFileInfoGenerator(selector);
+
+ auto self = checked_pointer_cast<SubTreeFileSystem>(shared_from_this());
+
+ std::function<Result<std::vector<FileInfo>>(const std::vector<FileInfo>& infos)>
+ fix_infos = [self](std::vector<FileInfo> infos) -> Result<std::vector<FileInfo>> {
+ for (auto& info : infos) {
+ RETURN_NOT_OK(self->FixInfo(&info));
+ }
+ return infos;
+ };
+ return MakeMappedGenerator(gen, fix_infos);
+}
+
Status SubTreeFileSystem::CreateDir(const std::string& path, bool recursive) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
@@ -378,6 +398,22 @@
return base_fs_->OpenInputStream(new_info);
}
+Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
+ const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenInputStreamAsync(s);
+}
+
+Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
+ const FileInfo& info) {
+ auto s = info.path();
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ FileInfo new_info(info);
+ new_info.set_path(std::move(s));
+ return base_fs_->OpenInputStreamAsync(new_info);
+}
+
Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile(
const std::string& path) {
auto s = path;
@@ -394,6 +430,22 @@
return base_fs_->OpenInputFile(new_info);
}
+Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
+ const std::string& path) {
+ auto s = path;
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ return base_fs_->OpenInputFileAsync(s);
+}
+
+Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
+ const FileInfo& info) {
+ auto s = info.path();
+ RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+ FileInfo new_info(info);
+ new_info.set_path(std::move(s));
+ return base_fs_->OpenInputFileAsync(new_info);
+}
+
Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenOutputStream(
const std::string& path) {
auto s = path;
diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h
index f779dd8..2fc5836 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -19,6 +19,7 @@
#include <chrono>
#include <cstdint>
+#include <functional>
#include <iosfwd>
#include <memory>
#include <string>
@@ -141,6 +142,19 @@
std::string path;
};
+using FileInfoVector = std::vector<FileInfo>;
+using FileInfoGenerator = std::function<Future<FileInfoVector>()>;
+
+} // namespace fs
+
+template <>
+struct IterationTraits<fs::FileInfoVector> {
+ static fs::FileInfoVector End() { return {}; }
+ static bool IsEnd(const fs::FileInfoVector& val) { return val.empty(); }
+};
+
+namespace fs {
+
/// \brief Abstract file system API
class ARROW_EXPORT FileSystem : public std::enable_shared_from_this<FileSystem> {
public:
@@ -171,20 +185,22 @@
/// a truly exceptional condition (low-level I/O error, etc.).
virtual Result<FileInfo> GetFileInfo(const std::string& path) = 0;
/// Same, for many targets at once.
- virtual Result<std::vector<FileInfo>> GetFileInfo(
- const std::vector<std::string>& paths);
+ virtual Result<FileInfoVector> GetFileInfo(const std::vector<std::string>& paths);
/// Same, according to a selector.
///
/// The selector's base directory will not be part of the results, even if
/// it exists.
/// If it doesn't exist, see `FileSelector::allow_not_found`.
- virtual Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) = 0;
+ virtual Result<FileInfoVector> GetFileInfo(const FileSelector& select) = 0;
/// EXPERIMENTAL: async version of GetFileInfo
- virtual Future<std::vector<FileInfo>> GetFileInfoAsync(
- const std::vector<std::string>& paths);
- /// EXPERIMENTAL: async version of GetFileInfo
- virtual Future<std::vector<FileInfo>> GetFileInfoAsync(const FileSelector& select);
+ virtual Future<FileInfoVector> GetFileInfoAsync(const std::vector<std::string>& paths);
+
+ /// EXPERIMENTAL: streaming async version of GetFileInfo
+ ///
+ /// The returned generator is not async-reentrant, i.e. you need to wait for
+ /// the returned future to complete before calling the generator again.
+ virtual FileInfoGenerator GetFileInfoGenerator(const FileSelector& select);
/// Create a directory and subdirectories.
///
@@ -314,7 +330,9 @@
using FileSystem::GetFileInfo;
/// \endcond
Result<FileInfo> GetFileInfo(const std::string& path) override;
- Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
+ Result<FileInfoVector> GetFileInfo(const FileSelector& select) override;
+
+ FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
Status CreateDir(const std::string& path, bool recursive = true) override;
@@ -335,6 +353,16 @@
const std::string& path) override;
Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
const FileInfo& info) override;
+
+ Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
+ const std::string& path) override;
+ Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
+ const FileInfo& info) override;
+ Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
+ const std::string& path) override;
+ Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
+ const FileInfo& info) override;
+
Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
const std::string& path) override;
Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
@@ -370,7 +398,7 @@
using FileSystem::GetFileInfo;
Result<FileInfo> GetFileInfo(const std::string& path) override;
- Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
+ Result<FileInfoVector> GetFileInfo(const FileSelector& select) override;
Status CreateDir(const std::string& path, bool recursive = true) override;
diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc b/cpp/src/arrow/filesystem/filesystem_test.cc
index f3b561f..8df84ff 100644
--- a/cpp/src/arrow/filesystem/filesystem_test.cc
+++ b/cpp/src/arrow/filesystem/filesystem_test.cc
@@ -267,21 +267,26 @@
////////////////////////////////////////////////////////////////////////////
// Generic MockFileSystem tests
+template <typename MockFileSystemType>
class TestMockFSGeneric : public ::testing::Test, public GenericFileSystemTest {
public:
void SetUp() override {
time_ = TimePoint(TimePoint::duration(42));
- fs_ = std::make_shared<MockFileSystem>(time_);
+ fs_ = std::make_shared<MockFileSystemType>(time_);
}
protected:
std::shared_ptr<FileSystem> GetEmptyFileSystem() override { return fs_; }
TimePoint time_;
- std::shared_ptr<MockFileSystem> fs_;
+ std::shared_ptr<FileSystem> fs_;
};
-GENERIC_FS_TEST_FUNCTIONS(TestMockFSGeneric);
+using MockFileSystemTypes = ::testing::Types<MockFileSystem, MockAsyncFileSystem>;
+
+TYPED_TEST_SUITE(TestMockFSGeneric, MockFileSystemTypes);
+
+GENERIC_FS_TYPED_TEST_FUNCTIONS(TestMockFSGeneric);
////////////////////////////////////////////////////////////////////////////
// Concrete MockFileSystem tests
diff --git a/cpp/src/arrow/filesystem/mockfs.cc b/cpp/src/arrow/filesystem/mockfs.cc
index 294cc85..e1ac05c 100644
--- a/cpp/src/arrow/filesystem/mockfs.cc
+++ b/cpp/src/arrow/filesystem/mockfs.cc
@@ -31,6 +31,8 @@
#include "arrow/filesystem/util_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/string_view.h"
#include "arrow/util/variant.h"
@@ -536,13 +538,13 @@
return info;
}
-Result<std::vector<FileInfo>> MockFileSystem::GetFileInfo(const FileSelector& selector) {
+Result<FileInfoVector> MockFileSystem::GetFileInfo(const FileSelector& selector) {
auto parts = SplitAbstractPath(selector.base_dir);
RETURN_NOT_OK(ValidateAbstractPathParts(parts));
auto guard = impl_->lock_guard();
- std::vector<FileInfo> results;
+ FileInfoVector results;
Entry* base_dir = impl_->FindEntry(parts);
if (base_dir == nullptr) {
@@ -746,6 +748,20 @@
return fs;
}
+FileInfoGenerator MockAsyncFileSystem::GetFileInfoGenerator(const FileSelector& select) {
+ auto maybe_infos = GetFileInfo(select);
+ if (maybe_infos.ok()) {
+ // Return the FileInfo entries one by one
+ const auto& infos = *maybe_infos;
+ std::vector<FileInfoVector> chunks(infos.size());
+ std::transform(infos.begin(), infos.end(), chunks.begin(),
+ [](const FileInfo& info) { return FileInfoVector{info}; });
+ return MakeVectorGenerator(std::move(chunks));
+ } else {
+ return MakeFailingGenerator(maybe_infos);
+ }
+}
+
} // namespace internal
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/mockfs.h b/cpp/src/arrow/filesystem/mockfs.h
index 212caf6..af0a327 100644
--- a/cpp/src/arrow/filesystem/mockfs.h
+++ b/cpp/src/arrow/filesystem/mockfs.h
@@ -114,6 +114,17 @@
std::unique_ptr<Impl> impl_;
};
+class ARROW_EXPORT MockAsyncFileSystem : public MockFileSystem {
+ public:
+ explicit MockAsyncFileSystem(TimePoint current_time,
+ const io::IOContext& io_context = io::default_io_context())
+ : MockFileSystem(current_time, io_context) {
+ default_async_is_sync_ = false;
+ }
+
+ FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
+};
+
} // namespace internal
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 400442d..75b1e71 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -74,6 +74,7 @@
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/util/async_generator.h"
#include "arrow/util/atomic_shared_ptr.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
@@ -87,6 +88,7 @@
using internal::TaskGroup;
using internal::Uri;
+using io::internal::SubmitIO;
namespace fs {
@@ -994,10 +996,9 @@
++upload_state_->parts_in_progress;
}
auto client = client_;
- ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit(
- io_context_.stop_token(), [client, req]() {
- return client->UploadPart(req);
- }));
+ ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
+ return client->UploadPart(req);
+ }));
// The closure keeps the buffer and the upload state alive
auto state = upload_state_;
auto part_number = part_number_;
@@ -1126,6 +1127,11 @@
template <typename... Args>
static Status Walk(Args&&... args) {
+ return WalkAsync(std::forward<Args>(args)...).status();
+ }
+
+ template <typename... Args>
+ static Future<> WalkAsync(Args&&... args) {
auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
return self->DoWalk();
}
@@ -1147,12 +1153,12 @@
std::shared_ptr<TaskGroup> task_group_;
std::mutex mutex_;
- Status DoWalk() {
+ Future<> DoWalk() {
task_group_ =
TaskGroup::MakeThreaded(io_context_.executor(), io_context_.stop_token());
WalkChild(base_dir_, /*nesting_depth=*/0);
// When this returns, ListObjectsV2 tasks either have finished or will exit early
- return task_group_->Finish();
+ return task_group_->FinishAsync();
}
bool ok() const { return task_group_->ok(); }
@@ -1249,7 +1255,7 @@
// -----------------------------------------------------------------------
// S3 filesystem implementation
-class S3FileSystem::Impl {
+class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Impl> {
public:
ClientBuilder builder_;
io::IOContext io_context_;
@@ -1404,32 +1410,20 @@
return Status::OK();
}
- // Workhorse for GetTargetStats(FileSelector...)
- Status Walk(const FileSelector& select, const std::string& bucket,
- const std::string& key, std::vector<FileInfo>* out) {
- bool is_empty = true;
+ // A helper class for Walk and WalkAsync
+ struct FileInfoCollector {
+ FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
+ : bucket(std::move(bucket)),
+ key(std::move(key)),
+ allow_not_found(select.allow_not_found) {}
- auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
- if (select.allow_not_found && IsNotFound(error)) {
- return Status::OK();
- }
- return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
- "' in bucket '", bucket, "': "),
- error);
- };
-
- auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
- RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
- return select.recursive && nesting_depth <= select.max_recursion;
- };
-
- auto handle_results = [&](const std::string& prefix,
- const S3Model::ListObjectsV2Result& result) -> Status {
+ Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
+ std::vector<FileInfo>* out) {
// Walk "directories"
- for (const auto& prefix : result.GetCommonPrefixes()) {
+ for (const auto& child_prefix : result.GetCommonPrefixes()) {
is_empty = false;
const auto child_key =
- internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
+ internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
std::stringstream child_path;
child_path << bucket << kSep << child_key;
FileInfo info;
@@ -1453,6 +1447,49 @@
out->push_back(std::move(info));
}
return Status::OK();
+ }
+
+ Status Finish(Impl* impl) {
+ // If no contents were found, perhaps it's an empty "directory",
+ // or perhaps it's a nonexistent entry. Check.
+ if (is_empty && !allow_not_found) {
+ bool is_actually_empty;
+ RETURN_NOT_OK(impl->IsEmptyDirectory(bucket, key, &is_actually_empty));
+ if (!is_actually_empty) {
+ return PathNotFound(bucket, key);
+ }
+ }
+ return Status::OK();
+ }
+
+ std::string bucket;
+ std::string key;
+ bool allow_not_found;
+ bool is_empty = true;
+ };
+
+ // Workhorse for GetFileInfo(FileSelector...)
+ Status Walk(const FileSelector& select, const std::string& bucket,
+ const std::string& key, std::vector<FileInfo>* out) {
+ FileInfoCollector collector(bucket, key, select);
+
+ auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
+ if (select.allow_not_found && IsNotFound(error)) {
+ return Status::OK();
+ }
+ return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
+ "' in bucket '", bucket, "': "),
+ error);
+ };
+
+ auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+ RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
+ return select.recursive && nesting_depth <= select.max_recursion;
+ };
+
+ auto handle_results = [&](const std::string& prefix,
+ const S3Model::ListObjectsV2Result& result) -> Status {
+ return collector.Collect(prefix, result, out);
};
RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
@@ -1460,17 +1497,59 @@
// If no contents were found, perhaps it's an empty "directory",
// or perhaps it's a nonexistent entry. Check.
- if (is_empty && !select.allow_not_found) {
- RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &is_empty));
- if (!is_empty) {
- return PathNotFound(bucket, key);
- }
- }
+ RETURN_NOT_OK(collector.Finish(this));
// Sort results for convenience, since they can come massively out of order
std::sort(out->begin(), out->end(), FileInfo::ByPath{});
return Status::OK();
}
+ // Workhorse for GetFileInfoGenerator(FileSelector...)
+ FileInfoGenerator WalkAsync(const FileSelector& select, const std::string& bucket,
+ const std::string& key) {
+ PushGenerator<std::vector<FileInfo>> gen;
+ auto producer = gen.producer();
+ auto collector = std::make_shared<FileInfoCollector>(bucket, key, select);
+ auto self = shared_from_this();
+
+ auto handle_error = [select, bucket, key](const AWSError<S3Errors>& error) -> Status {
+ if (select.allow_not_found && IsNotFound(error)) {
+ return Status::OK();
+ }
+ return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
+ "' in bucket '", bucket, "': "),
+ error);
+ };
+
+ auto handle_recursion = [select, self](int32_t nesting_depth) -> Result<bool> {
+ RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
+ return select.recursive && nesting_depth <= select.max_recursion;
+ };
+
+ auto handle_results =
+ [collector, producer](
+ const std::string& prefix,
+ const S3Model::ListObjectsV2Result& result) mutable -> Status {
+ std::vector<FileInfo> out;
+ RETURN_NOT_OK(collector->Collect(prefix, result, &out));
+ if (!out.empty()) {
+ producer.Push(std::move(out));
+ }
+ return Status::OK();
+ };
+
+ TreeWalker::WalkAsync(client_, io_context_, bucket, key, kListObjectsMaxKeys,
+ handle_results, handle_error, handle_recursion)
+ .AddCallback([collector, producer,
+ self](const Result<::arrow::detail::Empty>& res) mutable {
+ auto st = collector->Finish(self.get());
+ if (!st.ok()) {
+ producer.Push(st);
+ }
+ producer.Close();
+ });
+ return gen;
+ }
+
Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
std::vector<std::string>* file_keys,
std::vector<std::string>* dir_keys) {
@@ -1550,10 +1629,9 @@
}
req.SetBucket(ToAwsString(bucket));
req.SetDelete(std::move(del));
- ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit(
- io_context_.stop_token(), [client, req]() {
- return client->DeleteObjects(req);
- }));
+ ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
+ return client->DeleteObjects(req);
+ }));
futures.push_back(std::move(fut).Then(delete_cb));
}
@@ -1598,17 +1676,29 @@
return Status::OK();
}
- Status ListBuckets(std::vector<std::string>* out) {
- out->clear();
- auto outcome = client_->ListBuckets();
+ static Result<std::vector<std::string>> ProcessListBuckets(
+ const Aws::S3::Model::ListBucketsOutcome& outcome) {
if (!outcome.IsSuccess()) {
return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
outcome.GetError());
}
+ std::vector<std::string> buckets;
+ buckets.reserve(outcome.GetResult().GetBuckets().size());
for (const auto& bucket : outcome.GetResult().GetBuckets()) {
- out->emplace_back(FromAwsString(bucket.GetName()));
+ buckets.emplace_back(FromAwsString(bucket.GetName()));
}
- return Status::OK();
+ return buckets;
+ }
+
+ Result<std::vector<std::string>> ListBuckets() {
+ auto outcome = client_->ListBuckets();
+ return ProcessListBuckets(outcome);
+ }
+
+ Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
+ auto self = shared_from_this();
+ return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
+ .Then(Impl::ProcessListBuckets);
}
Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
@@ -1641,7 +1731,7 @@
};
S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext& io_context)
- : FileSystem(io_context), impl_(new Impl{options, io_context}) {
+ : FileSystem(io_context), impl_(std::make_shared<Impl>(options, io_context)) {
default_async_is_sync_ = false;
}
@@ -1736,15 +1826,14 @@
}
}
-Result<std::vector<FileInfo>> S3FileSystem::GetFileInfo(const FileSelector& select) {
+Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& select) {
ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));
- std::vector<FileInfo> results;
+ FileInfoVector results;
if (base_path.empty()) {
// List all buckets
- std::vector<std::string> buckets;
- RETURN_NOT_OK(impl_->ListBuckets(&buckets));
+ ARROW_ASSIGN_OR_RAISE(auto buckets, impl_->ListBuckets());
for (const auto& bucket : buckets) {
FileInfo info;
info.set_path(bucket);
@@ -1762,6 +1851,51 @@
return results;
}
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select) {
+ auto maybe_base_path = S3Path::FromString(select.base_dir);
+ if (!maybe_base_path.ok()) {
+ return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+ }
+ auto base_path = *std::move(maybe_base_path);
+
+ if (base_path.empty()) {
+ // List all buckets, then possibly recurse
+ PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+ auto producer = gen.producer();
+
+ auto fut = impl_->ListBucketsAsync(io_context());
+ auto impl = impl_->shared_from_this();
+ fut.AddCallback(
+ [producer, select, impl](const Result<std::vector<std::string>>& res) mutable {
+ if (!res.ok()) {
+ producer.Push(res.status());
+ producer.Close();
+ return;
+ }
+ FileInfoVector buckets;
+ for (const auto& bucket : *res) {
+ buckets.push_back(FileInfo{bucket, FileType::Directory});
+ }
+ // Generate all bucket infos
+ auto buckets_fut = Future<FileInfoVector>::MakeFinished(std::move(buckets));
+ producer.Push(MakeSingleFutureGenerator(buckets_fut));
+ if (select.recursive) {
+ // Generate recursive walk for each bucket in turn
+ for (const auto& bucket : *buckets_fut.result()) {
+ producer.Push(impl->WalkAsync(select, bucket.path(), ""));
+ }
+ }
+ producer.Close();
+ });
+
+ return MakeConcatenatedGenerator(
+ AsyncGenerator<AsyncGenerator<FileInfoVector>>{std::move(gen)});
+ }
+
+ // Nominal case -> walk a single bucket
+ return impl_->WalkAsync(select, base_path.bucket, base_path.key);
+}
+
Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index ac384fcb..a7f72fb 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -149,6 +149,8 @@
Result<FileInfo> GetFileInfo(const std::string& path) override;
Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
+ FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
+
Status CreateDir(const std::string& path, bool recursive = true) override;
Status DeleteDir(const std::string& path) override;
@@ -206,7 +208,7 @@
explicit S3FileSystem(const S3Options& options, const io::IOContext&);
class Impl;
- std::unique_ptr<Impl> impl_;
+ std::shared_ptr<Impl> impl_;
};
enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace };
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc
index c79d9f7..f5efcda 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -70,9 +70,12 @@
#include "arrow/filesystem/test_util.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
+#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
@@ -641,6 +644,34 @@
AssertFileInfo(infos[1], "bucket/somedir/subdir/subfile", FileType::File, 8);
}
+TEST_F(TestS3FS, GetFileInfoGenerator) {
+ FileSelector select;
+ FileInfoVector infos;
+
+ // Root dir
+ select.base_dir = "";
+ CollectFileInfoGenerator(fs_->GetFileInfoGenerator(select), &infos);
+ ASSERT_EQ(infos.size(), 2);
+ SortInfos(&infos);
+ AssertFileInfo(infos[0], "bucket", FileType::Directory);
+ AssertFileInfo(infos[1], "empty-bucket", FileType::Directory);
+
+ // Root dir, recursive
+ select.recursive = true;
+ CollectFileInfoGenerator(fs_->GetFileInfoGenerator(select), &infos);
+ ASSERT_EQ(infos.size(), 7);
+ SortInfos(&infos);
+ AssertFileInfo(infos[0], "bucket", FileType::Directory);
+ AssertFileInfo(infos[1], "bucket/emptydir", FileType::Directory);
+ AssertFileInfo(infos[2], "bucket/somedir", FileType::Directory);
+ AssertFileInfo(infos[3], "bucket/somedir/subdir", FileType::Directory);
+ AssertFileInfo(infos[4], "bucket/somedir/subdir/subfile", FileType::File, 8);
+ AssertFileInfo(infos[5], "bucket/somefile", FileType::File, 9);
+ AssertFileInfo(infos[6], "empty-bucket", FileType::Directory);
+
+ // Non-root dir case is tested by generic tests
+}
+
TEST_F(TestS3FS, CreateDir) {
FileInfo st;
diff --git a/cpp/src/arrow/filesystem/test_util.cc b/cpp/src/arrow/filesystem/test_util.cc
index 93d84c0..466b882 100644
--- a/cpp/src/arrow/filesystem/test_util.cc
+++ b/cpp/src/arrow/filesystem/test_util.cc
@@ -30,7 +30,9 @@
#include "arrow/status.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/util/async_generator.h"
#include "arrow/util/future.h"
+#include "arrow/util/vector.h"
using ::testing::ElementsAre;
@@ -111,6 +113,12 @@
std::sort(infos->begin(), infos->end(), FileInfo::ByPath{});
}
+void CollectFileInfoGenerator(FileInfoGenerator gen, FileInfoVector* out_infos) {
+ auto fut = CollectAsyncGenerator(gen);
+ ASSERT_FINISHES_OK_AND_ASSIGN(auto nested_infos, fut);
+ *out_infos = ::arrow::internal::FlattenVectors(nested_infos);
+}
+
void AssertFileInfo(const FileInfo& info, const std::string& path, FileType type) {
ASSERT_EQ(info.path(), path);
ASSERT_EQ(info.type(), type) << "For path '" << info.path() << "'";
@@ -681,7 +689,7 @@
ASSERT_RAISES(IOError, fs->GetFileInfo(s));
}
-void GenericFileSystemTest::TestGetFileInfoSelectorAsync(FileSystem* fs) {
+void GenericFileSystemTest::TestGetFileInfoGenerator(FileSystem* fs) {
ASSERT_OK(fs->CreateDir("AB/CD"));
CreateFile(fs, "abc", "data");
CreateFile(fs, "AB/def", "some data");
@@ -691,9 +699,11 @@
FileSelector s;
s.base_dir = "";
std::vector<FileInfo> infos;
+ std::vector<std::vector<FileInfo>> nested_infos;
// Non-recursive
- ASSERT_FINISHES_OK_AND_ASSIGN(infos, fs->GetFileInfoAsync(s));
+ auto gen = fs->GetFileInfoGenerator(s);
+ CollectFileInfoGenerator(std::move(gen), &infos);
SortInfos(&infos);
ASSERT_EQ(infos.size(), 2);
AssertFileInfo(infos[0], "AB", FileType::Directory);
@@ -702,7 +712,7 @@
// Recursive
s.base_dir = "AB";
s.recursive = true;
- ASSERT_FINISHES_OK_AND_ASSIGN(infos, fs->GetFileInfoAsync(s));
+ CollectFileInfoGenerator(fs->GetFileInfoGenerator(s), &infos);
SortInfos(&infos);
ASSERT_EQ(infos.size(), 4);
AssertFileInfo(infos[0], "AB/CD", FileType::Directory);
@@ -712,9 +722,10 @@
// Doesn't exist
s.base_dir = "XX";
- ASSERT_RAISES(IOError, fs->GetFileInfoAsync(s).result());
+ auto fut = CollectAsyncGenerator(fs->GetFileInfoGenerator(s));
+ ASSERT_FINISHES_AND_RAISES(IOError, fut);
s.allow_not_found = true;
- ASSERT_FINISHES_OK_AND_ASSIGN(infos, fs->GetFileInfoAsync(s));
+ CollectFileInfoGenerator(fs->GetFileInfoGenerator(s), &infos);
ASSERT_EQ(infos.size(), 0);
}
@@ -1025,7 +1036,7 @@
GENERIC_FS_TEST_DEFINE(TestGetFileInfoSelector)
GENERIC_FS_TEST_DEFINE(TestGetFileInfoSelectorWithRecursion)
GENERIC_FS_TEST_DEFINE(TestGetFileInfoAsync)
-GENERIC_FS_TEST_DEFINE(TestGetFileInfoSelectorAsync)
+GENERIC_FS_TEST_DEFINE(TestGetFileInfoGenerator)
GENERIC_FS_TEST_DEFINE(TestOpenOutputStream)
GENERIC_FS_TEST_DEFINE(TestOpenAppendStream)
GENERIC_FS_TEST_DEFINE(TestOpenInputStream)
diff --git a/cpp/src/arrow/filesystem/test_util.h b/cpp/src/arrow/filesystem/test_util.h
index 232d06f..7941756 100644
--- a/cpp/src/arrow/filesystem/test_util.h
+++ b/cpp/src/arrow/filesystem/test_util.h
@@ -43,7 +43,10 @@
// Sort a vector of FileInfo by lexicographic path order
ARROW_TESTING_EXPORT
-void SortInfos(std::vector<FileInfo>* infos);
+void SortInfos(FileInfoVector* infos);
+
+ARROW_TESTING_EXPORT
+void CollectFileInfoGenerator(FileInfoGenerator gen, FileInfoVector* out_infos);
ARROW_TESTING_EXPORT
void AssertFileInfo(const FileInfo& info, const std::string& path, FileType type);
@@ -109,7 +112,7 @@
void TestGetFileInfoSelector();
void TestGetFileInfoSelectorWithRecursion();
void TestGetFileInfoAsync();
- void TestGetFileInfoSelectorAsync();
+ void TestGetFileInfoGenerator();
void TestOpenOutputStream();
void TestOpenAppendStream();
void TestOpenInputStream();
@@ -154,7 +157,7 @@
void TestGetFileInfoSelector(FileSystem* fs);
void TestGetFileInfoSelectorWithRecursion(FileSystem* fs);
void TestGetFileInfoAsync(FileSystem* fs);
- void TestGetFileInfoSelectorAsync(FileSystem* fs);
+ void TestGetFileInfoGenerator(FileSystem* fs);
void TestOpenOutputStream(FileSystem* fs);
void TestOpenAppendStream(FileSystem* fs);
void TestOpenInputStream(FileSystem* fs);
@@ -185,7 +188,7 @@
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoSelector) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoSelectorWithRecursion) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoAsync) \
- GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoSelectorAsync) \
+ GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoGenerator) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenOutputStream) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenAppendStream) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenInputStream) \
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index dc2112e..d052c01 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -136,9 +136,8 @@
TaskHints hints;
hints.io_size = nbytes;
hints.external_id = ctx.external_id();
- return DeferNotOk(ctx.executor()->Submit(std::move(hints), [self, position, nbytes] {
- return self->ReadAt(position, nbytes);
- }));
+ return DeferNotOk(internal::SubmitIO(
+ ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
}
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
diff --git a/cpp/src/arrow/io/util_internal.h b/cpp/src/arrow/io/util_internal.h
index f711227..b1d75d1 100644
--- a/cpp/src/arrow/io/util_internal.h
+++ b/cpp/src/arrow/io/util_internal.h
@@ -18,9 +18,11 @@
#pragma once
#include <memory>
+#include <utility>
#include <vector>
#include "arrow/io/interfaces.h"
+#include "arrow/util/thread_pool.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"
@@ -50,6 +52,15 @@
ARROW_EXPORT
::arrow::internal::ThreadPool* GetIOThreadPool();
+template <typename... SubmitArgs>
+auto SubmitIO(IOContext io_context, SubmitArgs&&... submit_args)
+ -> decltype(std::declval<::arrow::internal::Executor*>()->Submit(submit_args...)) {
+ ::arrow::internal::TaskHints hints;
+ hints.external_id = io_context.external_id();
+ return io_context.executor()->Submit(hints, io_context.stop_token(),
+ std::forward<SubmitArgs>(submit_args)...);
+}
+
} // namespace internal
} // namespace io
} // namespace arrow
diff --git a/cpp/src/arrow/testing/future_util.h b/cpp/src/arrow/testing/future_util.h
index 44fa78c..a61a989 100644
--- a/cpp/src/arrow/testing/future_util.h
+++ b/cpp/src/arrow/testing/future_util.h
@@ -47,15 +47,15 @@
#define ASSERT_FINISHES_AND_RAISES(ENUM, expr) \
do { \
- auto&& fut = (expr); \
- ASSERT_FINISHES_IMPL(fut); \
- ASSERT_RAISES(ENUM, fut.status()); \
+ auto&& _fut = (expr); \
+ ASSERT_FINISHES_IMPL(_fut); \
+ ASSERT_RAISES(ENUM, _fut.status()); \
} while (false)
-#define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, future_name) \
- auto future_name = (rexpr); \
- ASSERT_FINISHES_IMPL(future_name); \
- ASSERT_OK_AND_ASSIGN(lhs, future_name.result());
+#define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, _future_name) \
+ auto _future_name = (rexpr); \
+ ASSERT_FINISHES_IMPL(_future_name); \
+ ASSERT_OK_AND_ASSIGN(lhs, _future_name.result());
#define ASSERT_FINISHES_OK_AND_ASSIGN(lhs, rexpr) \
ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, \
diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h
index 46018ef..7eb318c 100644
--- a/cpp/src/arrow/type_fwd.h
+++ b/cpp/src/arrow/type_fwd.h
@@ -29,6 +29,8 @@
template <typename T>
class Iterator;
+template <typename T>
+struct IterationTraits;
template <typename T>
class Result;
diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h
index f034cea..06e823a 100644
--- a/cpp/src/arrow/util/async_generator.h
+++ b/cpp/src/arrow/util/async_generator.h
@@ -44,7 +44,7 @@
// the utilities Visit/Collect/Await take care to do this).
//
// Asynchronous reentrancy on the other hand means the function is called again before the
-// future returned by the function is marekd finished (but after the call to get the
+// future returned by the function is marked finished (but after the call to get the
// future returns). Some of these generators are async-reentrant while others (e.g.
// those that depend on ordered processing like decompression) are not. Read the MakeXYZ
// function comments to determine which generators support async reentrancy.
@@ -1332,4 +1332,46 @@
return MakeGeneratorIterator(std::move(owned_bg_generator));
}
+/// \brief Make a generator that returns a single pre-generated future
+///
+/// This generator is async-reentrant.
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
+ assert(future.is_valid());
+ auto state = std::make_shared<Future<T>>(std::move(future));
+ return [state]() -> Future<T> {
+ auto fut = std::move(*state);
+ if (fut.is_valid()) {
+ return fut;
+ } else {
+ return AsyncGeneratorEnd<T>();
+ }
+ };
+}
+
+/// \brief Make a generator that always fails with a given error
+///
+/// This generator is async-reentrant.
+template <typename T>
+AsyncGenerator<T> MakeFailingGenerator(Status st) {
+ assert(!st.ok());
+ auto state = std::make_shared<Status>(std::move(st));
+ return [state]() -> Future<T> {
+ auto st = std::move(*state);
+ if (!st.ok()) {
+ return std::move(st);
+ } else {
+ return AsyncGeneratorEnd<T>();
+ }
+ };
+}
+
+/// \brief Make a generator that always fails with a given error
+///
+/// This overload allows inferring the return type from the argument.
+template <typename T>
+AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) {
+ return MakeFailingGenerator<T>(result.status());
+}
+
} // namespace arrow
diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc
index 51e4f94..36d0629 100644
--- a/cpp/src/arrow/util/async_generator_test.cc
+++ b/cpp/src/arrow/util/async_generator_test.cc
@@ -1299,4 +1299,26 @@
}
}
+TEST(SingleFutureGenerator, Basics) {
+ auto fut = Future<TestInt>::Make();
+ auto gen = MakeSingleFutureGenerator(fut);
+ auto collect_fut = CollectAsyncGenerator(gen);
+ AssertNotFinished(collect_fut);
+ fut.MarkFinished(TestInt{42});
+ ASSERT_FINISHES_OK_AND_ASSIGN(auto collected, collect_fut);
+ ASSERT_EQ(collected, std::vector<TestInt>{42});
+ // Generator exhausted
+ collect_fut = CollectAsyncGenerator(gen);
+ ASSERT_FINISHES_OK_AND_EQ(std::vector<TestInt>{}, collect_fut);
+}
+
+TEST(FailingGenerator, Basics) {
+ auto gen = MakeFailingGenerator<TestInt>(Status::IOError("zzz"));
+ auto collect_fut = CollectAsyncGenerator(gen);
+ ASSERT_FINISHES_AND_RAISES(IOError, collect_fut);
+ // Generator exhausted
+ collect_fut = CollectAsyncGenerator(gen);
+ ASSERT_FINISHES_OK_AND_EQ(std::vector<TestInt>{}, collect_fut);
+}
+
} // namespace arrow