ARROW-11924: [C++] Add streaming version of FileSystem::GetFileInfo

Closes #9995 from pitrou/ARROW-11924-get-file-info-generator

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc
index 7cfe266..98dc057 100644
--- a/cpp/src/arrow/filesystem/filesystem.cc
+++ b/cpp/src/arrow/filesystem/filesystem.cc
@@ -32,19 +32,24 @@
 #include "arrow/filesystem/path_util.h"
 #include "arrow/filesystem/util_internal.h"
 #include "arrow/io/slow.h"
+#include "arrow/io/util_internal.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/async_generator.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/parallel.h"
 #include "arrow/util/uri.h"
+#include "arrow/util/vector.h"
 #include "arrow/util/windows_fixup.h"
 
 namespace arrow {
 
+using internal::checked_pointer_cast;
 using internal::TaskHints;
 using internal::Uri;
+using io::internal::SubmitIO;
 
 namespace fs {
 
@@ -143,11 +148,8 @@
   if (synchronous) {
     return std::forward<DeferredFunc>(func)(std::move(self));
   }
-  TaskHints hints;
-  hints.external_id = fs->io_context().external_id();
-  // TODO pass StopToken
-  return DeferNotOk(fs->io_context().executor()->Submit(
-      hints, std::forward<DeferredFunc>(func), std::move(self)));
+  return DeferNotOk(io::internal::SubmitIO(
+      fs->io_context(), std::forward<DeferredFunc>(func), std::move(self)));
 }
 
 }  // namespace
@@ -159,10 +161,11 @@
       [paths](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(paths); });
 }
 
-Future<std::vector<FileInfo>> FileSystem::GetFileInfoAsync(const FileSelector& select) {
-  return FileSystemDefer(
+FileInfoGenerator FileSystem::GetFileInfoGenerator(const FileSelector& select) {
+  auto fut = FileSystemDefer(
       this, default_async_is_sync_,
       [select](std::shared_ptr<FileSystem> self) { return self->GetFileInfo(select); });
+  return MakeSingleFutureGenerator(std::move(fut));
 }
 
 Status FileSystem::DeleteFiles(const std::vector<std::string>& paths) {
@@ -312,6 +315,23 @@
   return infos;
 }
 
+FileInfoGenerator SubTreeFileSystem::GetFileInfoGenerator(const FileSelector& select) {
+  auto selector = select;
+  selector.base_dir = PrependBase(selector.base_dir);
+  auto gen = base_fs_->GetFileInfoGenerator(selector);
+
+  auto self = checked_pointer_cast<SubTreeFileSystem>(shared_from_this());
+
+  std::function<Result<std::vector<FileInfo>>(const std::vector<FileInfo>& infos)>
+      fix_infos = [self](std::vector<FileInfo> infos) -> Result<std::vector<FileInfo>> {
+    for (auto& info : infos) {
+      RETURN_NOT_OK(self->FixInfo(&info));
+    }
+    return infos;
+  };
+  return MakeMappedGenerator(gen, fix_infos);
+}
+
 Status SubTreeFileSystem::CreateDir(const std::string& path, bool recursive) {
   auto s = path;
   RETURN_NOT_OK(PrependBaseNonEmpty(&s));
@@ -378,6 +398,22 @@
   return base_fs_->OpenInputStream(new_info);
 }
 
+Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
+    const std::string& path) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->OpenInputStreamAsync(s);
+}
+
+Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
+    const FileInfo& info) {
+  auto s = info.path();
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  FileInfo new_info(info);
+  new_info.set_path(std::move(s));
+  return base_fs_->OpenInputStreamAsync(new_info);
+}
+
 Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile(
     const std::string& path) {
   auto s = path;
@@ -394,6 +430,22 @@
   return base_fs_->OpenInputFile(new_info);
 }
 
+Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
+    const std::string& path) {
+  auto s = path;
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  return base_fs_->OpenInputFileAsync(s);
+}
+
+Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
+    const FileInfo& info) {
+  auto s = info.path();
+  RETURN_NOT_OK(PrependBaseNonEmpty(&s));
+  FileInfo new_info(info);
+  new_info.set_path(std::move(s));
+  return base_fs_->OpenInputFileAsync(new_info);
+}
+
 Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenOutputStream(
     const std::string& path) {
   auto s = path;
diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h
index f779dd8..2fc5836 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -19,6 +19,7 @@
 
 #include <chrono>
 #include <cstdint>
+#include <functional>
 #include <iosfwd>
 #include <memory>
 #include <string>
@@ -141,6 +142,19 @@
   std::string path;
 };
 
+using FileInfoVector = std::vector<FileInfo>;
+using FileInfoGenerator = std::function<Future<FileInfoVector>()>;
+
+}  // namespace fs
+
+template <>
+struct IterationTraits<fs::FileInfoVector> {
+  static fs::FileInfoVector End() { return {}; }
+  static bool IsEnd(const fs::FileInfoVector& val) { return val.empty(); }
+};
+
+namespace fs {
+
 /// \brief Abstract file system API
 class ARROW_EXPORT FileSystem : public std::enable_shared_from_this<FileSystem> {
  public:
@@ -171,20 +185,22 @@
   /// a truly exceptional condition (low-level I/O error, etc.).
   virtual Result<FileInfo> GetFileInfo(const std::string& path) = 0;
   /// Same, for many targets at once.
-  virtual Result<std::vector<FileInfo>> GetFileInfo(
-      const std::vector<std::string>& paths);
+  virtual Result<FileInfoVector> GetFileInfo(const std::vector<std::string>& paths);
   /// Same, according to a selector.
   ///
   /// The selector's base directory will not be part of the results, even if
   /// it exists.
   /// If it doesn't exist, see `FileSelector::allow_not_found`.
-  virtual Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) = 0;
+  virtual Result<FileInfoVector> GetFileInfo(const FileSelector& select) = 0;
 
   /// EXPERIMENTAL: async version of GetFileInfo
-  virtual Future<std::vector<FileInfo>> GetFileInfoAsync(
-      const std::vector<std::string>& paths);
-  /// EXPERIMENTAL: async version of GetFileInfo
-  virtual Future<std::vector<FileInfo>> GetFileInfoAsync(const FileSelector& select);
+  virtual Future<FileInfoVector> GetFileInfoAsync(const std::vector<std::string>& paths);
+
+  /// EXPERIMENTAL: streaming async version of GetFileInfo
+  ///
+  /// The returned generator is not async-reentrant, i.e. you need to wait for
+  /// the returned future to complete before calling the generator again.
+  virtual FileInfoGenerator GetFileInfoGenerator(const FileSelector& select);
 
   /// Create a directory and subdirectories.
   ///
@@ -314,7 +330,9 @@
   using FileSystem::GetFileInfo;
   /// \endcond
   Result<FileInfo> GetFileInfo(const std::string& path) override;
-  Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
+  Result<FileInfoVector> GetFileInfo(const FileSelector& select) override;
+
+  FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
 
   Status CreateDir(const std::string& path, bool recursive = true) override;
 
@@ -335,6 +353,16 @@
       const std::string& path) override;
   Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
       const FileInfo& info) override;
+
+  Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
+      const std::string& path) override;
+  Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
+      const FileInfo& info) override;
+  Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
+      const std::string& path) override;
+  Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
+      const FileInfo& info) override;
+
   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
       const std::string& path) override;
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
@@ -370,7 +398,7 @@
 
   using FileSystem::GetFileInfo;
   Result<FileInfo> GetFileInfo(const std::string& path) override;
-  Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
+  Result<FileInfoVector> GetFileInfo(const FileSelector& select) override;
 
   Status CreateDir(const std::string& path, bool recursive = true) override;
 
diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc b/cpp/src/arrow/filesystem/filesystem_test.cc
index f3b561f..8df84ff 100644
--- a/cpp/src/arrow/filesystem/filesystem_test.cc
+++ b/cpp/src/arrow/filesystem/filesystem_test.cc
@@ -267,21 +267,26 @@
 ////////////////////////////////////////////////////////////////////////////
 // Generic MockFileSystem tests
 
+template <typename MockFileSystemType>
 class TestMockFSGeneric : public ::testing::Test, public GenericFileSystemTest {
  public:
   void SetUp() override {
     time_ = TimePoint(TimePoint::duration(42));
-    fs_ = std::make_shared<MockFileSystem>(time_);
+    fs_ = std::make_shared<MockFileSystemType>(time_);
   }
 
  protected:
   std::shared_ptr<FileSystem> GetEmptyFileSystem() override { return fs_; }
 
   TimePoint time_;
-  std::shared_ptr<MockFileSystem> fs_;
+  std::shared_ptr<FileSystem> fs_;
 };
 
-GENERIC_FS_TEST_FUNCTIONS(TestMockFSGeneric);
+using MockFileSystemTypes = ::testing::Types<MockFileSystem, MockAsyncFileSystem>;
+
+TYPED_TEST_SUITE(TestMockFSGeneric, MockFileSystemTypes);
+
+GENERIC_FS_TYPED_TEST_FUNCTIONS(TestMockFSGeneric);
 
 ////////////////////////////////////////////////////////////////////////////
 // Concrete MockFileSystem tests
diff --git a/cpp/src/arrow/filesystem/mockfs.cc b/cpp/src/arrow/filesystem/mockfs.cc
index 294cc85..e1ac05c 100644
--- a/cpp/src/arrow/filesystem/mockfs.cc
+++ b/cpp/src/arrow/filesystem/mockfs.cc
@@ -31,6 +31,8 @@
 #include "arrow/filesystem/util_internal.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/string_view.h"
 #include "arrow/util/variant.h"
@@ -536,13 +538,13 @@
   return info;
 }
 
-Result<std::vector<FileInfo>> MockFileSystem::GetFileInfo(const FileSelector& selector) {
+Result<FileInfoVector> MockFileSystem::GetFileInfo(const FileSelector& selector) {
   auto parts = SplitAbstractPath(selector.base_dir);
   RETURN_NOT_OK(ValidateAbstractPathParts(parts));
 
   auto guard = impl_->lock_guard();
 
-  std::vector<FileInfo> results;
+  FileInfoVector results;
 
   Entry* base_dir = impl_->FindEntry(parts);
   if (base_dir == nullptr) {
@@ -746,6 +748,20 @@
   return fs;
 }
 
+FileInfoGenerator MockAsyncFileSystem::GetFileInfoGenerator(const FileSelector& select) {
+  auto maybe_infos = GetFileInfo(select);
+  if (maybe_infos.ok()) {
+    // Return the FileInfo entries one by one
+    const auto& infos = *maybe_infos;
+    std::vector<FileInfoVector> chunks(infos.size());
+    std::transform(infos.begin(), infos.end(), chunks.begin(),
+                   [](const FileInfo& info) { return FileInfoVector{info}; });
+    return MakeVectorGenerator(std::move(chunks));
+  } else {
+    return MakeFailingGenerator(maybe_infos);
+  }
+}
+
 }  // namespace internal
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/mockfs.h b/cpp/src/arrow/filesystem/mockfs.h
index 212caf6..af0a327 100644
--- a/cpp/src/arrow/filesystem/mockfs.h
+++ b/cpp/src/arrow/filesystem/mockfs.h
@@ -114,6 +114,17 @@
   std::unique_ptr<Impl> impl_;
 };
 
+class ARROW_EXPORT MockAsyncFileSystem : public MockFileSystem {
+ public:
+  explicit MockAsyncFileSystem(TimePoint current_time,
+                               const io::IOContext& io_context = io::default_io_context())
+      : MockFileSystem(current_time, io_context) {
+    default_async_is_sync_ = false;
+  }
+
+  FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
+};
+
 }  // namespace internal
 }  // namespace fs
 }  // namespace arrow
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 400442d..75b1e71 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -74,6 +74,7 @@
 #include "arrow/io/util_internal.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/async_generator.h"
 #include "arrow/util/atomic_shared_ptr.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/future.h"
@@ -87,6 +88,7 @@
 
 using internal::TaskGroup;
 using internal::Uri;
+using io::internal::SubmitIO;
 
 namespace fs {
 
@@ -994,10 +996,9 @@
         ++upload_state_->parts_in_progress;
       }
       auto client = client_;
-      ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit(
-                                          io_context_.stop_token(), [client, req]() {
-                                            return client->UploadPart(req);
-                                          }));
+      ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
+                              return client->UploadPart(req);
+                            }));
       // The closure keeps the buffer and the upload state alive
       auto state = upload_state_;
       auto part_number = part_number_;
@@ -1126,6 +1127,11 @@
 
   template <typename... Args>
   static Status Walk(Args&&... args) {
+    return WalkAsync(std::forward<Args>(args)...).status();
+  }
+
+  template <typename... Args>
+  static Future<> WalkAsync(Args&&... args) {
     auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
     return self->DoWalk();
   }
@@ -1147,12 +1153,12 @@
   std::shared_ptr<TaskGroup> task_group_;
   std::mutex mutex_;
 
-  Status DoWalk() {
+  Future<> DoWalk() {
     task_group_ =
         TaskGroup::MakeThreaded(io_context_.executor(), io_context_.stop_token());
     WalkChild(base_dir_, /*nesting_depth=*/0);
     // When this returns, ListObjectsV2 tasks either have finished or will exit early
-    return task_group_->Finish();
+    return task_group_->FinishAsync();
   }
 
   bool ok() const { return task_group_->ok(); }
@@ -1249,7 +1255,7 @@
 // -----------------------------------------------------------------------
 // S3 filesystem implementation
 
-class S3FileSystem::Impl {
+class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Impl> {
  public:
   ClientBuilder builder_;
   io::IOContext io_context_;
@@ -1404,32 +1410,20 @@
     return Status::OK();
   }
 
-  // Workhorse for GetTargetStats(FileSelector...)
-  Status Walk(const FileSelector& select, const std::string& bucket,
-              const std::string& key, std::vector<FileInfo>* out) {
-    bool is_empty = true;
+  // A helper class for Walk and WalkAsync
+  struct FileInfoCollector {
+    FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
+        : bucket(std::move(bucket)),
+          key(std::move(key)),
+          allow_not_found(select.allow_not_found) {}
 
-    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
-      }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
-                                                 "' in bucket '", bucket, "': "),
-                           error);
-    };
-
-    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
-      RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
-      return select.recursive && nesting_depth <= select.max_recursion;
-    };
-
-    auto handle_results = [&](const std::string& prefix,
-                              const S3Model::ListObjectsV2Result& result) -> Status {
+    Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
+                   std::vector<FileInfo>* out) {
       // Walk "directories"
-      for (const auto& prefix : result.GetCommonPrefixes()) {
+      for (const auto& child_prefix : result.GetCommonPrefixes()) {
         is_empty = false;
         const auto child_key =
-            internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
+            internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
         std::stringstream child_path;
         child_path << bucket << kSep << child_key;
         FileInfo info;
@@ -1453,6 +1447,49 @@
         out->push_back(std::move(info));
       }
       return Status::OK();
+    }
+
+    Status Finish(Impl* impl) {
+      // If no contents were found, perhaps it's an empty "directory",
+      // or perhaps it's a nonexistent entry.  Check.
+      if (is_empty && !allow_not_found) {
+        bool is_actually_empty;
+        RETURN_NOT_OK(impl->IsEmptyDirectory(bucket, key, &is_actually_empty));
+        if (!is_actually_empty) {
+          return PathNotFound(bucket, key);
+        }
+      }
+      return Status::OK();
+    }
+
+    std::string bucket;
+    std::string key;
+    bool allow_not_found;
+    bool is_empty = true;
+  };
+
+  // Workhorse for GetFileInfo(FileSelector...)
+  Status Walk(const FileSelector& select, const std::string& bucket,
+              const std::string& key, std::vector<FileInfo>* out) {
+    FileInfoCollector collector(bucket, key, select);
+
+    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
+      if (select.allow_not_found && IsNotFound(error)) {
+        return Status::OK();
+      }
+      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
+                                                 "' in bucket '", bucket, "': "),
+                           error);
+    };
+
+    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+      RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
+      return select.recursive && nesting_depth <= select.max_recursion;
+    };
+
+    auto handle_results = [&](const std::string& prefix,
+                              const S3Model::ListObjectsV2Result& result) -> Status {
+      return collector.Collect(prefix, result, out);
     };
 
     RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
@@ -1460,17 +1497,59 @@
 
     // If no contents were found, perhaps it's an empty "directory",
     // or perhaps it's a nonexistent entry.  Check.
-    if (is_empty && !select.allow_not_found) {
-      RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &is_empty));
-      if (!is_empty) {
-        return PathNotFound(bucket, key);
-      }
-    }
+    RETURN_NOT_OK(collector.Finish(this));
     // Sort results for convenience, since they can come massively out of order
     std::sort(out->begin(), out->end(), FileInfo::ByPath{});
     return Status::OK();
   }
 
+  // Workhorse for GetFileInfoGenerator(FileSelector...)
+  FileInfoGenerator WalkAsync(const FileSelector& select, const std::string& bucket,
+                              const std::string& key) {
+    PushGenerator<std::vector<FileInfo>> gen;
+    auto producer = gen.producer();
+    auto collector = std::make_shared<FileInfoCollector>(bucket, key, select);
+    auto self = shared_from_this();
+
+    auto handle_error = [select, bucket, key](const AWSError<S3Errors>& error) -> Status {
+      if (select.allow_not_found && IsNotFound(error)) {
+        return Status::OK();
+      }
+      return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
+                                                 "' in bucket '", bucket, "': "),
+                           error);
+    };
+
+    auto handle_recursion = [select, self](int32_t nesting_depth) -> Result<bool> {
+      RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
+      return select.recursive && nesting_depth <= select.max_recursion;
+    };
+
+    auto handle_results =
+        [collector, producer](
+            const std::string& prefix,
+            const S3Model::ListObjectsV2Result& result) mutable -> Status {
+      std::vector<FileInfo> out;
+      RETURN_NOT_OK(collector->Collect(prefix, result, &out));
+      if (!out.empty()) {
+        producer.Push(std::move(out));
+      }
+      return Status::OK();
+    };
+
+    TreeWalker::WalkAsync(client_, io_context_, bucket, key, kListObjectsMaxKeys,
+                          handle_results, handle_error, handle_recursion)
+        .AddCallback([collector, producer,
+                      self](const Result<::arrow::detail::Empty>& res) mutable {
+          auto st = collector->Finish(self.get());
+          if (!st.ok()) {
+            producer.Push(st);
+          }
+          producer.Close();
+        });
+    return gen;
+  }
+
   Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
                           std::vector<std::string>* file_keys,
                           std::vector<std::string>* dir_keys) {
@@ -1550,10 +1629,9 @@
       }
       req.SetBucket(ToAwsString(bucket));
       req.SetDelete(std::move(del));
-      ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit(
-                                          io_context_.stop_token(), [client, req]() {
-                                            return client->DeleteObjects(req);
-                                          }));
+      ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
+                              return client->DeleteObjects(req);
+                            }));
       futures.push_back(std::move(fut).Then(delete_cb));
     }
 
@@ -1598,17 +1676,29 @@
     return Status::OK();
   }
 
-  Status ListBuckets(std::vector<std::string>* out) {
-    out->clear();
-    auto outcome = client_->ListBuckets();
+  static Result<std::vector<std::string>> ProcessListBuckets(
+      const Aws::S3::Model::ListBucketsOutcome& outcome) {
     if (!outcome.IsSuccess()) {
       return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
                            outcome.GetError());
     }
+    std::vector<std::string> buckets;
+    buckets.reserve(outcome.GetResult().GetBuckets().size());
     for (const auto& bucket : outcome.GetResult().GetBuckets()) {
-      out->emplace_back(FromAwsString(bucket.GetName()));
+      buckets.emplace_back(FromAwsString(bucket.GetName()));
     }
-    return Status::OK();
+    return buckets;
+  }
+
+  Result<std::vector<std::string>> ListBuckets() {
+    auto outcome = client_->ListBuckets();
+    return ProcessListBuckets(outcome);
+  }
+
+  Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
+    auto self = shared_from_this();
+    return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
+        .Then(Impl::ProcessListBuckets);
   }
 
   Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
@@ -1641,7 +1731,7 @@
 };
 
 S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext& io_context)
-    : FileSystem(io_context), impl_(new Impl{options, io_context}) {
+    : FileSystem(io_context), impl_(std::make_shared<Impl>(options, io_context)) {
   default_async_is_sync_ = false;
 }
 
@@ -1736,15 +1826,14 @@
   }
 }
 
-Result<std::vector<FileInfo>> S3FileSystem::GetFileInfo(const FileSelector& select) {
+Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& select) {
   ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));
 
-  std::vector<FileInfo> results;
+  FileInfoVector results;
 
   if (base_path.empty()) {
     // List all buckets
-    std::vector<std::string> buckets;
-    RETURN_NOT_OK(impl_->ListBuckets(&buckets));
+    ARROW_ASSIGN_OR_RAISE(auto buckets, impl_->ListBuckets());
     for (const auto& bucket : buckets) {
       FileInfo info;
       info.set_path(bucket);
@@ -1762,6 +1851,51 @@
   return results;
 }
 
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select) {
+  auto maybe_base_path = S3Path::FromString(select.base_dir);
+  if (!maybe_base_path.ok()) {
+    return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+  }
+  auto base_path = *std::move(maybe_base_path);
+
+  if (base_path.empty()) {
+    // List all buckets, then possibly recurse
+    PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+    auto producer = gen.producer();
+
+    auto fut = impl_->ListBucketsAsync(io_context());
+    auto impl = impl_->shared_from_this();
+    fut.AddCallback(
+        [producer, select, impl](const Result<std::vector<std::string>>& res) mutable {
+          if (!res.ok()) {
+            producer.Push(res.status());
+            producer.Close();
+            return;
+          }
+          FileInfoVector buckets;
+          for (const auto& bucket : *res) {
+            buckets.push_back(FileInfo{bucket, FileType::Directory});
+          }
+          // Generate all bucket infos
+          auto buckets_fut = Future<FileInfoVector>::MakeFinished(std::move(buckets));
+          producer.Push(MakeSingleFutureGenerator(buckets_fut));
+          if (select.recursive) {
+            // Generate recursive walk for each bucket in turn
+            for (const auto& bucket : *buckets_fut.result()) {
+              producer.Push(impl->WalkAsync(select, bucket.path(), ""));
+            }
+          }
+          producer.Close();
+        });
+
+    return MakeConcatenatedGenerator(
+        AsyncGenerator<AsyncGenerator<FileInfoVector>>{std::move(gen)});
+  }
+
+  // Nominal case -> walk a single bucket
+  return impl_->WalkAsync(select, base_path.bucket, base_path.key);
+}
+
 Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
 
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index ac384fcb..a7f72fb 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -149,6 +149,8 @@
   Result<FileInfo> GetFileInfo(const std::string& path) override;
   Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
 
+  FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
+
   Status CreateDir(const std::string& path, bool recursive = true) override;
 
   Status DeleteDir(const std::string& path) override;
@@ -206,7 +208,7 @@
   explicit S3FileSystem(const S3Options& options, const io::IOContext&);
 
   class Impl;
-  std::unique_ptr<Impl> impl_;
+  std::shared_ptr<Impl> impl_;
 };
 
 enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace };
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc
index c79d9f7..f5efcda 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -70,9 +70,12 @@
 #include "arrow/filesystem/test_util.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/testing/future_util.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/util.h"
+#include "arrow/util/async_generator.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
@@ -641,6 +644,34 @@
   AssertFileInfo(infos[1], "bucket/somedir/subdir/subfile", FileType::File, 8);
 }
 
+TEST_F(TestS3FS, GetFileInfoGenerator) {
+  FileSelector select;
+  FileInfoVector infos;
+
+  // Root dir
+  select.base_dir = "";
+  CollectFileInfoGenerator(fs_->GetFileInfoGenerator(select), &infos);
+  ASSERT_EQ(infos.size(), 2);
+  SortInfos(&infos);
+  AssertFileInfo(infos[0], "bucket", FileType::Directory);
+  AssertFileInfo(infos[1], "empty-bucket", FileType::Directory);
+
+  // Root dir, recursive
+  select.recursive = true;
+  CollectFileInfoGenerator(fs_->GetFileInfoGenerator(select), &infos);
+  ASSERT_EQ(infos.size(), 7);
+  SortInfos(&infos);
+  AssertFileInfo(infos[0], "bucket", FileType::Directory);
+  AssertFileInfo(infos[1], "bucket/emptydir", FileType::Directory);
+  AssertFileInfo(infos[2], "bucket/somedir", FileType::Directory);
+  AssertFileInfo(infos[3], "bucket/somedir/subdir", FileType::Directory);
+  AssertFileInfo(infos[4], "bucket/somedir/subdir/subfile", FileType::File, 8);
+  AssertFileInfo(infos[5], "bucket/somefile", FileType::File, 9);
+  AssertFileInfo(infos[6], "empty-bucket", FileType::Directory);
+
+  // Non-root dir case is tested by generic tests
+}
+
 TEST_F(TestS3FS, CreateDir) {
   FileInfo st;
 
diff --git a/cpp/src/arrow/filesystem/test_util.cc b/cpp/src/arrow/filesystem/test_util.cc
index 93d84c0..466b882 100644
--- a/cpp/src/arrow/filesystem/test_util.cc
+++ b/cpp/src/arrow/filesystem/test_util.cc
@@ -30,7 +30,9 @@
 #include "arrow/status.h"
 #include "arrow/testing/future_util.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/util/async_generator.h"
 #include "arrow/util/future.h"
+#include "arrow/util/vector.h"
 
 using ::testing::ElementsAre;
 
@@ -111,6 +113,12 @@
   std::sort(infos->begin(), infos->end(), FileInfo::ByPath{});
 }
 
+void CollectFileInfoGenerator(FileInfoGenerator gen, FileInfoVector* out_infos) {
+  auto fut = CollectAsyncGenerator(gen);
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto nested_infos, fut);
+  *out_infos = ::arrow::internal::FlattenVectors(nested_infos);
+}
+
 void AssertFileInfo(const FileInfo& info, const std::string& path, FileType type) {
   ASSERT_EQ(info.path(), path);
   ASSERT_EQ(info.type(), type) << "For path '" << info.path() << "'";
@@ -681,7 +689,7 @@
   ASSERT_RAISES(IOError, fs->GetFileInfo(s));
 }
 
-void GenericFileSystemTest::TestGetFileInfoSelectorAsync(FileSystem* fs) {
+void GenericFileSystemTest::TestGetFileInfoGenerator(FileSystem* fs) {
   ASSERT_OK(fs->CreateDir("AB/CD"));
   CreateFile(fs, "abc", "data");
   CreateFile(fs, "AB/def", "some data");
@@ -691,9 +699,11 @@
   FileSelector s;
   s.base_dir = "";
   std::vector<FileInfo> infos;
+  std::vector<std::vector<FileInfo>> nested_infos;
 
   // Non-recursive
-  ASSERT_FINISHES_OK_AND_ASSIGN(infos, fs->GetFileInfoAsync(s));
+  auto gen = fs->GetFileInfoGenerator(s);
+  CollectFileInfoGenerator(std::move(gen), &infos);
   SortInfos(&infos);
   ASSERT_EQ(infos.size(), 2);
   AssertFileInfo(infos[0], "AB", FileType::Directory);
@@ -702,7 +712,7 @@
   // Recursive
   s.base_dir = "AB";
   s.recursive = true;
-  ASSERT_FINISHES_OK_AND_ASSIGN(infos, fs->GetFileInfoAsync(s));
+  CollectFileInfoGenerator(fs->GetFileInfoGenerator(s), &infos);
   SortInfos(&infos);
   ASSERT_EQ(infos.size(), 4);
   AssertFileInfo(infos[0], "AB/CD", FileType::Directory);
@@ -712,9 +722,10 @@
 
   // Doesn't exist
   s.base_dir = "XX";
-  ASSERT_RAISES(IOError, fs->GetFileInfoAsync(s).result());
+  auto fut = CollectAsyncGenerator(fs->GetFileInfoGenerator(s));
+  ASSERT_FINISHES_AND_RAISES(IOError, fut);
   s.allow_not_found = true;
-  ASSERT_FINISHES_OK_AND_ASSIGN(infos, fs->GetFileInfoAsync(s));
+  CollectFileInfoGenerator(fs->GetFileInfoGenerator(s), &infos);
   ASSERT_EQ(infos.size(), 0);
 }
 
@@ -1025,7 +1036,7 @@
 GENERIC_FS_TEST_DEFINE(TestGetFileInfoSelector)
 GENERIC_FS_TEST_DEFINE(TestGetFileInfoSelectorWithRecursion)
 GENERIC_FS_TEST_DEFINE(TestGetFileInfoAsync)
-GENERIC_FS_TEST_DEFINE(TestGetFileInfoSelectorAsync)
+GENERIC_FS_TEST_DEFINE(TestGetFileInfoGenerator)
 GENERIC_FS_TEST_DEFINE(TestOpenOutputStream)
 GENERIC_FS_TEST_DEFINE(TestOpenAppendStream)
 GENERIC_FS_TEST_DEFINE(TestOpenInputStream)
diff --git a/cpp/src/arrow/filesystem/test_util.h b/cpp/src/arrow/filesystem/test_util.h
index 232d06f..7941756 100644
--- a/cpp/src/arrow/filesystem/test_util.h
+++ b/cpp/src/arrow/filesystem/test_util.h
@@ -43,7 +43,10 @@
 
 // Sort a vector of FileInfo by lexicographic path order
 ARROW_TESTING_EXPORT
-void SortInfos(std::vector<FileInfo>* infos);
+void SortInfos(FileInfoVector* infos);
+
+ARROW_TESTING_EXPORT
+void CollectFileInfoGenerator(FileInfoGenerator gen, FileInfoVector* out_infos);
 
 ARROW_TESTING_EXPORT
 void AssertFileInfo(const FileInfo& info, const std::string& path, FileType type);
@@ -109,7 +112,7 @@
   void TestGetFileInfoSelector();
   void TestGetFileInfoSelectorWithRecursion();
   void TestGetFileInfoAsync();
-  void TestGetFileInfoSelectorAsync();
+  void TestGetFileInfoGenerator();
   void TestOpenOutputStream();
   void TestOpenAppendStream();
   void TestOpenInputStream();
@@ -154,7 +157,7 @@
   void TestGetFileInfoSelector(FileSystem* fs);
   void TestGetFileInfoSelectorWithRecursion(FileSystem* fs);
   void TestGetFileInfoAsync(FileSystem* fs);
-  void TestGetFileInfoSelectorAsync(FileSystem* fs);
+  void TestGetFileInfoGenerator(FileSystem* fs);
   void TestOpenOutputStream(FileSystem* fs);
   void TestOpenAppendStream(FileSystem* fs);
   void TestOpenInputStream(FileSystem* fs);
@@ -185,7 +188,7 @@
   GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoSelector)              \
   GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoSelectorWithRecursion) \
   GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoAsync)                 \
-  GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoSelectorAsync)         \
+  GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoGenerator)             \
   GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenOutputStream)                 \
   GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenAppendStream)                 \
   GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenInputStream)                  \
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index dc2112e..d052c01 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -136,9 +136,8 @@
   TaskHints hints;
   hints.io_size = nbytes;
   hints.external_id = ctx.external_id();
-  return DeferNotOk(ctx.executor()->Submit(std::move(hints), [self, position, nbytes] {
-    return self->ReadAt(position, nbytes);
-  }));
+  return DeferNotOk(internal::SubmitIO(
+      ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
 }
 
 Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
diff --git a/cpp/src/arrow/io/util_internal.h b/cpp/src/arrow/io/util_internal.h
index f711227..b1d75d1 100644
--- a/cpp/src/arrow/io/util_internal.h
+++ b/cpp/src/arrow/io/util_internal.h
@@ -18,9 +18,11 @@
 #pragma once
 
 #include <memory>
+#include <utility>
 #include <vector>
 
 #include "arrow/io/interfaces.h"
+#include "arrow/util/thread_pool.h"
 #include "arrow/util/type_fwd.h"
 #include "arrow/util/visibility.h"
 
@@ -50,6 +52,15 @@
 ARROW_EXPORT
 ::arrow::internal::ThreadPool* GetIOThreadPool();
 
+template <typename... SubmitArgs>
+auto SubmitIO(IOContext io_context, SubmitArgs&&... submit_args)
+    -> decltype(std::declval<::arrow::internal::Executor*>()->Submit(submit_args...)) {
+  ::arrow::internal::TaskHints hints;
+  hints.external_id = io_context.external_id();
+  return io_context.executor()->Submit(hints, io_context.stop_token(),
+                                       std::forward<SubmitArgs>(submit_args)...);
+}
+
 }  // namespace internal
 }  // namespace io
 }  // namespace arrow
diff --git a/cpp/src/arrow/testing/future_util.h b/cpp/src/arrow/testing/future_util.h
index 44fa78c..a61a989 100644
--- a/cpp/src/arrow/testing/future_util.h
+++ b/cpp/src/arrow/testing/future_util.h
@@ -47,15 +47,15 @@
 
 #define ASSERT_FINISHES_AND_RAISES(ENUM, expr) \
   do {                                         \
-    auto&& fut = (expr);                       \
-    ASSERT_FINISHES_IMPL(fut);                 \
-    ASSERT_RAISES(ENUM, fut.status());         \
+    auto&& _fut = (expr);                      \
+    ASSERT_FINISHES_IMPL(_fut);                \
+    ASSERT_RAISES(ENUM, _fut.status());        \
   } while (false)
 
-#define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, future_name) \
-  auto future_name = (rexpr);                                       \
-  ASSERT_FINISHES_IMPL(future_name);                                \
-  ASSERT_OK_AND_ASSIGN(lhs, future_name.result());
+#define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, _future_name) \
+  auto _future_name = (rexpr);                                       \
+  ASSERT_FINISHES_IMPL(_future_name);                                \
+  ASSERT_OK_AND_ASSIGN(lhs, _future_name.result());
 
 #define ASSERT_FINISHES_OK_AND_ASSIGN(lhs, rexpr) \
   ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr,  \
diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h
index 46018ef..7eb318c 100644
--- a/cpp/src/arrow/type_fwd.h
+++ b/cpp/src/arrow/type_fwd.h
@@ -29,6 +29,8 @@
 
 template <typename T>
 class Iterator;
+template <typename T>
+struct IterationTraits;
 
 template <typename T>
 class Result;
diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h
index f034cea..06e823a 100644
--- a/cpp/src/arrow/util/async_generator.h
+++ b/cpp/src/arrow/util/async_generator.h
@@ -44,7 +44,7 @@
 // the utilities Visit/Collect/Await take care to do this).
 //
 // Asynchronous reentrancy on the other hand means the function is called again before the
-// future returned by the function is marekd finished (but after the call to get the
+// future returned by the function is marked finished (but after the call to get the
 // future returns).  Some of these generators are async-reentrant while others (e.g.
 // those that depend on ordered processing like decompression) are not.  Read the MakeXYZ
 // function comments to determine which generators support async reentrancy.
@@ -1332,4 +1332,46 @@
   return MakeGeneratorIterator(std::move(owned_bg_generator));
 }
 
+/// \brief Make a generator that returns a single pre-generated future
+///
+/// This generator is async-reentrant.
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
+  assert(future.is_valid());
+  auto state = std::make_shared<Future<T>>(std::move(future));
+  return [state]() -> Future<T> {
+    auto fut = std::move(*state);
+    if (fut.is_valid()) {
+      return fut;
+    } else {
+      return AsyncGeneratorEnd<T>();
+    }
+  };
+}
+
+/// \brief Make a generator that always fails with a given error
+///
+/// This generator is async-reentrant.
+template <typename T>
+AsyncGenerator<T> MakeFailingGenerator(Status st) {
+  assert(!st.ok());
+  auto state = std::make_shared<Status>(std::move(st));
+  return [state]() -> Future<T> {
+    auto st = std::move(*state);
+    if (!st.ok()) {
+      return std::move(st);
+    } else {
+      return AsyncGeneratorEnd<T>();
+    }
+  };
+}
+
+/// \brief Make a generator that always fails with a given error
+///
+/// This overload allows inferring the return type from the argument.
+template <typename T>
+AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) {
+  return MakeFailingGenerator<T>(result.status());
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc
index 51e4f94..36d0629 100644
--- a/cpp/src/arrow/util/async_generator_test.cc
+++ b/cpp/src/arrow/util/async_generator_test.cc
@@ -1299,4 +1299,26 @@
   }
 }
 
+TEST(SingleFutureGenerator, Basics) {
+  auto fut = Future<TestInt>::Make();
+  auto gen = MakeSingleFutureGenerator(fut);
+  auto collect_fut = CollectAsyncGenerator(gen);
+  AssertNotFinished(collect_fut);
+  fut.MarkFinished(TestInt{42});
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto collected, collect_fut);
+  ASSERT_EQ(collected, std::vector<TestInt>{42});
+  // Generator exhausted
+  collect_fut = CollectAsyncGenerator(gen);
+  ASSERT_FINISHES_OK_AND_EQ(std::vector<TestInt>{}, collect_fut);
+}
+
+TEST(FailingGenerator, Basics) {
+  auto gen = MakeFailingGenerator<TestInt>(Status::IOError("zzz"));
+  auto collect_fut = CollectAsyncGenerator(gen);
+  ASSERT_FINISHES_AND_RAISES(IOError, collect_fut);
+  // Generator exhausted
+  collect_fut = CollectAsyncGenerator(gen);
+  ASSERT_FINISHES_OK_AND_EQ(std::vector<TestInt>{}, collect_fut);
+}
+
 }  // namespace arrow