ARROW-12161: [C++][Dataset] Revert async CSV reader in datasets

Reverts the streaming CSV reader and the async workaround introduced for it.  It will be reintroduced, more cleanly, in ARROW-12355

Closes #10019 from westonpace/feature/revert-arrow-12161

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index c435236..5b6e11e 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -199,19 +199,6 @@
     return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn);
   }
 
-  static AsyncGenerator<CSVBlock> MakeAsyncIterator(
-      AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
-      std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer) {
-    auto block_reader =
-        std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer);
-    // Wrap shared pointer in callable
-    Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
-        [block_reader](std::shared_ptr<Buffer> next) {
-          return (*block_reader)(std::move(next));
-        };
-    return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn);
-  }
-
   Result<TransformFlow<CSVBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
     if (buffer_ == nullptr) {
       return TransformFinish();
@@ -585,25 +572,22 @@
 
 class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
  public:
-  BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor,
-                      std::shared_ptr<io::InputStream> input,
-                      const ReadOptions& read_options, const ParseOptions& parse_options,
-                      const ConvertOptions& convert_options)
-      : ReaderMixin(io_context, std::move(input), read_options, parse_options,
-                    convert_options),
-        cpu_executor_(cpu_executor) {}
+  using ReaderMixin::ReaderMixin;
 
-  virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0;
+  virtual Status Init() = 0;
 
   std::shared_ptr<Schema> schema() const override { return schema_; }
 
   Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
-    auto next_fut = ReadNextAsync();
-    auto next_result = next_fut.result();
-    return std::move(next_result).Value(batch);
+    do {
+      RETURN_NOT_OK(ReadNext().Value(batch));
+    } while (*batch != nullptr && (*batch)->num_rows() == 0);
+    return Status::OK();
   }
 
  protected:
+  virtual Result<std::shared_ptr<RecordBatch>> ReadNext() = 0;
+
   // Make column decoders from conversion schema
   Status MakeColumnDecoders() {
     for (const auto& column : conversion_schema_.columns) {
@@ -686,141 +670,101 @@
   std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
   std::shared_ptr<Schema> schema_;
   std::shared_ptr<RecordBatch> pending_batch_;
-  AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
-  Executor* cpu_executor_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
   bool eof_ = false;
 };
 
 /////////////////////////////////////////////////////////////////////////
 // Serial StreamingReader implementation
 
-class SerialStreamingReader : public BaseStreamingReader,
-                              public std::enable_shared_from_this<SerialStreamingReader> {
+class SerialStreamingReader : public BaseStreamingReader {
  public:
   using BaseStreamingReader::BaseStreamingReader;
 
-  Future<std::shared_ptr<csv::StreamingReader>> Init() override {
+  Status Init() override {
     ARROW_ASSIGN_OR_RAISE(auto istream_it,
                           io::MakeInputStreamIterator(input_, read_options_.block_size));
 
-    // TODO Consider exposing readahead as a read option (ARROW-12090)
-    ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
-                                                              io_context_.executor()));
-
-    auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
-
-    buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it));
+    // Since we're converting serially, no need to readahead more than one block
+    int32_t block_queue_size = 1;
+    ARROW_ASSIGN_OR_RAISE(auto rh_it,
+                          MakeReadaheadIterator(std::move(istream_it), block_queue_size));
+    buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
     task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
 
-    auto self = shared_from_this();
     // Read schema from first batch
-    return ReadNextAsync().Then([self](const std::shared_ptr<RecordBatch>& first_batch)
-                                    -> Result<std::shared_ptr<csv::StreamingReader>> {
-      self->pending_batch_ = first_batch;
-      DCHECK_NE(self->schema_, nullptr);
-      return self;
-    });
+    ARROW_ASSIGN_OR_RAISE(pending_batch_, ReadNext());
+    DCHECK_NE(schema_, nullptr);
+    return Status::OK();
   }
 
-  Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
-    auto maybe_batch = DecodeNextBatch();
-    if (schema_ == nullptr && maybe_batch.ok()) {
-      schema_ = (*maybe_batch)->schema();
-    }
-    return maybe_batch;
-  }
-
-  Future<std::shared_ptr<RecordBatch>> DoReadNext(
-      std::shared_ptr<SerialStreamingReader> self) {
-    auto batch = std::move(pending_batch_);
-    if (batch != nullptr) {
-      return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
-    }
-
-    if (!source_eof_) {
-      return block_generator_()
-          .Then([self](const CSVBlock& maybe_block) -> Status {
-            if (!IsIterationEnd(maybe_block)) {
-              self->last_block_index_ = maybe_block.block_index;
-              auto maybe_parsed = self->ParseAndInsert(
-                  maybe_block.partial, maybe_block.completion, maybe_block.buffer,
-                  maybe_block.block_index, maybe_block.is_final);
-              if (!maybe_parsed.ok()) {
-                // Parse error => bail out
-                self->eof_ = true;
-                return maybe_parsed.status();
-              }
-              RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
-            } else {
-              self->source_eof_ = true;
-              for (auto& decoder : self->column_decoders_) {
-                decoder->SetEOF(self->last_block_index_ + 1);
-              }
-            }
-            return Status::OK();
-          })
-          .Then([self](const ::arrow::detail::Empty& st)
-                    -> Result<std::shared_ptr<RecordBatch>> {
-            return self->DecodeBatchAndUpdateSchema();
-          });
-    }
-    return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
-        DecodeBatchAndUpdateSchema());
-  }
-
-  Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty(
-      std::shared_ptr<SerialStreamingReader> self) {
-    return DoReadNext(self).Then([self](const std::shared_ptr<RecordBatch>& batch) {
-      if (batch != nullptr && batch->num_rows() == 0) {
-        return self->ReadNextSkippingEmpty(self);
-      }
-      return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
-    });
-  }
-
-  Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
+ protected:
+  Result<std::shared_ptr<RecordBatch>> ReadNext() override {
     if (eof_) {
-      return Future<std::shared_ptr<RecordBatch>>::MakeFinished(nullptr);
+      return nullptr;
     }
     if (io_context_.stop_token().IsStopRequested()) {
       eof_ = true;
       return io_context_.stop_token().Poll();
     }
-    auto self = shared_from_this();
-    if (!block_generator_) {
-      return SetupReader(self).Then([self](const Result<::arrow::detail::Empty>& res)
-                                        -> Future<std::shared_ptr<RecordBatch>> {
-        if (!res.ok()) {
-          self->eof_ = true;
-          return res.status();
-        }
-        return self->ReadNextSkippingEmpty(self);
-      });
-    } else {
-      return self->ReadNextSkippingEmpty(self);
+    if (!block_iterator_) {
+      Status st = SetupReader();
+      if (!st.ok()) {
+        // Can't setup reader => bail out
+        eof_ = true;
+        return st;
+      }
     }
+    auto batch = std::move(pending_batch_);
+    if (batch != nullptr) {
+      return batch;
+    }
+
+    if (!source_eof_) {
+      ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator_.Next());
+      if (!IsIterationEnd(maybe_block)) {
+        last_block_index_ = maybe_block.block_index;
+        auto maybe_parsed = ParseAndInsert(maybe_block.partial, maybe_block.completion,
+                                           maybe_block.buffer, maybe_block.block_index,
+                                           maybe_block.is_final);
+        if (!maybe_parsed.ok()) {
+          // Parse error => bail out
+          eof_ = true;
+          return maybe_parsed.status();
+        }
+        RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
+      } else {
+        source_eof_ = true;
+        for (auto& decoder : column_decoders_) {
+          decoder->SetEOF(last_block_index_ + 1);
+        }
+      }
+    }
+
+    auto maybe_batch = DecodeNextBatch();
+    if (schema_ == nullptr && maybe_batch.ok()) {
+      schema_ = (*maybe_batch)->schema();
+    }
+    return maybe_batch;
   };
 
- protected:
-  Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) {
-    return buffer_generator_().Then([self](const std::shared_ptr<Buffer>& first_buffer) {
-      if (first_buffer == nullptr) {
-        return Status::Invalid("Empty CSV file");
-      }
-      auto own_first_buffer = first_buffer;
-      RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer));
-      RETURN_NOT_OK(self->MakeColumnDecoders());
+  Status SetupReader() {
+    ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next());
+    if (first_buffer == nullptr) {
+      return Status::Invalid("Empty CSV file");
+    }
+    RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer));
+    RETURN_NOT_OK(MakeColumnDecoders());
 
-      self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
-          std::move(self->buffer_generator_), MakeChunker(self->parse_options_),
-          std::move(own_first_buffer));
-      return Status::OK();
-    });
+    block_iterator_ = SerialBlockReader::MakeIterator(std::move(buffer_iterator_),
+                                                      MakeChunker(parse_options_),
+                                                      std::move(first_buffer));
+    return Status::OK();
   }
 
   bool source_eof_ = false;
   int64_t last_block_index_ = 0;
-  AsyncGenerator<CSVBlock> block_generator_;
+  Iterator<CSVBlock> block_iterator_;
 };
 
 /////////////////////////////////////////////////////////////////////////
@@ -999,14 +943,15 @@
   return reader;
 }
 
-Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
+Result<std::shared_ptr<StreamingReader>> MakeStreamingReader(
     io::IOContext io_context, std::shared_ptr<io::InputStream> input,
     internal::Executor* cpu_executor, const ReadOptions& read_options,
     const ParseOptions& parse_options, const ConvertOptions& convert_options) {
   std::shared_ptr<BaseStreamingReader> reader;
-  reader = std::make_shared<SerialStreamingReader>(
-      io_context, cpu_executor, input, read_options, parse_options, convert_options);
-  return reader->Init();
+  reader = std::make_shared<SerialStreamingReader>(io_context, input, read_options,
+                                                   parse_options, convert_options);
+  RETURN_NOT_OK(reader->Init());
+  return reader;
 }
 
 }  // namespace
@@ -1036,11 +981,8 @@
     const ConvertOptions& convert_options) {
   auto io_context = io::IOContext(pool);
   auto cpu_executor = internal::GetCpuThreadPool();
-  auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
-                                        read_options, parse_options, convert_options);
-  auto reader_result = reader_fut.result();
-  ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
-  return reader;
+  return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
+                             parse_options, convert_options);
 }
 
 Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
@@ -1048,17 +990,6 @@
     const ReadOptions& read_options, const ParseOptions& parse_options,
     const ConvertOptions& convert_options) {
   auto cpu_executor = internal::GetCpuThreadPool();
-  auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
-                                        read_options, parse_options, convert_options);
-  auto reader_result = reader_fut.result();
-  ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
-  return reader;
-}
-
-Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
-    io::IOContext io_context, std::shared_ptr<io::InputStream> input,
-    internal::Executor* cpu_executor, const ReadOptions& read_options,
-    const ParseOptions& parse_options, const ConvertOptions& convert_options) {
   return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
                              parse_options, convert_options);
 }
diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h
index 72f1375..8e56824 100644
--- a/cpp/src/arrow/csv/reader.h
+++ b/cpp/src/arrow/csv/reader.h
@@ -65,17 +65,6 @@
   virtual ~StreamingReader() = default;
 
   /// Create a StreamingReader instance
-  ///
-  /// This involves some I/O as the first batch must be loaded during the creation process
-  /// so it is returned as a future
-  ///
-  /// Currently, the StreamingReader is not async-reentrant and does not do any fan-out
-  /// parsing (see ARROW-11889)
-  static Future<std::shared_ptr<StreamingReader>> MakeAsync(
-      io::IOContext io_context, std::shared_ptr<io::InputStream> input,
-      internal::Executor* cpu_executor, const ReadOptions&, const ParseOptions&,
-      const ConvertOptions&);
-
   static Result<std::shared_ptr<StreamingReader>> Make(
       io::IOContext io_context, std::shared_ptr<io::InputStream> input,
       const ReadOptions&, const ParseOptions&, const ConvertOptions&);
diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc
index 228ab71..dbe6b1d 100644
--- a/cpp/src/arrow/csv/reader_test.cc
+++ b/cpp/src/arrow/csv/reader_test.cc
@@ -32,14 +32,10 @@
 #include "arrow/table.h"
 #include "arrow/testing/future_util.h"
 #include "arrow/testing/gtest_util.h"
-#include "arrow/util/async_generator.h"
 #include "arrow/util/future.h"
 #include "arrow/util/thread_pool.h"
 
 namespace arrow {
-
-using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;
-
 namespace csv {
 
 // Allows the streaming reader to be used in tests that expect a table reader
@@ -49,17 +45,12 @@
       : reader_(std::move(reader)) {}
   virtual ~StreamingReaderAsTableReader() = default;
   virtual Result<std::shared_ptr<Table>> Read() {
-    auto table_fut = ReadAsync();
-    auto table_res = table_fut.result();
-    ARROW_ASSIGN_OR_RAISE(auto table, table_res);
+    std::shared_ptr<Table> table;
+    RETURN_NOT_OK(reader_->ReadAll(&table));
     return table;
   }
   virtual Future<std::shared_ptr<Table>> ReadAsync() {
-    auto reader = reader_;
-    RecordBatchGenerator rb_generator = [reader]() { return reader->ReadNextAsync(); };
-    return CollectAsyncGenerator(rb_generator).Then([](const RecordBatchVector& rbs) {
-      return Table::FromRecordBatches(rbs);
-    });
+    return Future<std::shared_ptr<Table>>::MakeFinished(Read());
   }
 
  private:
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 7b2f420..f4a3a0b 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -428,25 +428,16 @@
   auto task_group = scan_options.TaskGroup();
 
   for (const auto& scan_task : scan_tasks) {
-    if (scan_task->supports_async()) {
-      ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync(cpu_executor));
-      std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
-          [&, scan_task](std::shared_ptr<RecordBatch> batch) {
-            return WriteNextBatch(state, scan_task->fragment(), std::move(batch));
-          };
-      scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
-    } else {
-      task_group->Append([&, scan_task] {
-        ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());
+    task_group->Append([&, scan_task] {
+      ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());
 
-        for (auto maybe_batch : batches) {
-          ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
-          RETURN_NOT_OK(WriteNextBatch(state, scan_task->fragment(), std::move(batch)));
-        }
+      for (auto maybe_batch : batches) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+        RETURN_NOT_OK(WriteNextBatch(state, scan_task->fragment(), std::move(batch)));
+      }
 
-        return Status::OK();
-      });
-    }
+      return Status::OK();
+    });
   }
   scan_futs.push_back(task_group->FinishAsync());
   return AllComplete(scan_futs);
diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc
index 9a7a9d2..8ba6505 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -34,7 +34,6 @@
 #include "arrow/io/compressed.h"
 #include "arrow/result.h"
 #include "arrow/type.h"
-#include "arrow/util/async_generator.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/logging.h"
 
@@ -114,53 +113,34 @@
   return read_options;
 }
 
-static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
-    const FileSource& source, const CsvFileFormat& format,
-    internal::Executor* cpu_executor,
-    const std::shared_ptr<ScanOptions>& scan_options = nullptr,
-    MemoryPool* pool = default_memory_pool()) {
-  ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
-
-  ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
-  ARROW_ASSIGN_OR_RAISE(
-      input, io::BufferedInputStream::Create(reader_options.block_size,
-                                             default_memory_pool(), std::move(input)));
-
-  auto peek_fut = DeferNotOk(input->io_context().executor()->Submit(
-      [input, reader_options] { return input->Peek(reader_options.block_size); }));
-
-  return peek_fut.Then([=](const util::string_view& first_block)
-                           -> Future<std::shared_ptr<csv::StreamingReader>> {
-    const auto& parse_options = format.parse_options;
-    auto convert_options = csv::ConvertOptions::Defaults();
-    if (scan_options != nullptr) {
-      ARROW_ASSIGN_OR_RAISE(convert_options,
-                            GetConvertOptions(format, scan_options, first_block, pool));
-    }
-
-    return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input),
-                                           cpu_executor, reader_options, parse_options,
-                                           convert_options)
-        .Then(
-            [](const std::shared_ptr<csv::StreamingReader>& maybe_reader)
-                -> Result<std::shared_ptr<csv::StreamingReader>> { return maybe_reader; },
-            [source](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
-              return err.WithMessage("Could not open CSV input source '", source.path(),
-                                     "': ", err);
-            });
-  });
-}
-
 static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
     const FileSource& source, const CsvFileFormat& format,
     const std::shared_ptr<ScanOptions>& scan_options = nullptr,
     MemoryPool* pool = default_memory_pool()) {
-  bool use_threads = (scan_options != nullptr && scan_options->use_threads);
-  return internal::RunSynchronously<std::shared_ptr<csv::StreamingReader>>(
-      [&](Executor* executor) {
-        return OpenReaderAsync(source, format, executor, scan_options, pool);
-      },
-      use_threads);
+  ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
+
+  util::string_view first_block;
+  ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
+  ARROW_ASSIGN_OR_RAISE(
+      input, io::BufferedInputStream::Create(reader_options.block_size,
+                                             default_memory_pool(), std::move(input)));
+  ARROW_ASSIGN_OR_RAISE(first_block, input->Peek(reader_options.block_size));
+
+  const auto& parse_options = format.parse_options;
+  auto convert_options = csv::ConvertOptions::Defaults();
+  if (scan_options != nullptr) {
+    ARROW_ASSIGN_OR_RAISE(convert_options,
+                          GetConvertOptions(format, scan_options, first_block, pool));
+  }
+
+  auto maybe_reader =
+      csv::StreamingReader::Make(io::IOContext(pool), std::move(input), reader_options,
+                                 parse_options, convert_options);
+  if (!maybe_reader.ok()) {
+    return maybe_reader.status().WithMessage("Could not open CSV input source '",
+                                             source.path(), "': ", maybe_reader.status());
+  }
+  return maybe_reader;
 }
 
 /// \brief A ScanTask backed by an Csv file.
@@ -174,20 +154,9 @@
         source_(fragment->source()) {}
 
   Result<RecordBatchIterator> Execute() override {
-    ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync(internal::GetCpuThreadPool()));
-    return MakeGeneratorIterator(std::move(gen));
-  }
-
-  bool supports_async() const override { return true; }
-
-  Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor) override {
-    auto reader_fut =
-        OpenReaderAsync(source_, *format_, cpu_executor, options(), options()->pool);
-    auto generator_fut = reader_fut.Then(
-        [](const std::shared_ptr<csv::StreamingReader>& reader) -> RecordBatchGenerator {
-          return [reader]() { return reader->ReadNextAsync(); };
-        });
-    return MakeFromFuture(generator_fut);
+    ARROW_ASSIGN_OR_RAISE(auto reader,
+                          OpenReader(source_, *format_, options(), options()->pool));
+    return IteratorFromReader(std::move(reader));
   }
 
  private:
@@ -225,8 +194,8 @@
     std::shared_ptr<ScanOptions> options,
     const std::shared_ptr<FileFragment>& fragment) const {
   auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
-  auto task = std::make_shared<CsvScanTask>(std::move(this_), std::move(options),
-                                            std::move(fragment));
+  auto task =
+      std::make_shared<CsvScanTask>(std::move(this_), std::move(options), fragment);
 
   return MakeVectorIterator<std::shared_ptr<ScanTask>>({std::move(task)});
 }
diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc
index fdbb451..c7ce515 100644
--- a/cpp/src/arrow/dataset/file_test.cc
+++ b/cpp/src/arrow/dataset/file_test.cc
@@ -249,35 +249,6 @@
                 });
 }
 
-class TestFilesystemDatasetNestedParallelism : public NestedParallelismMixin {};
-
-TEST_F(TestFilesystemDatasetNestedParallelism, Write) {
-  constexpr int NUM_BATCHES = 32;
-  RecordBatchVector batches;
-  for (int i = 0; i < NUM_BATCHES; i++) {
-    batches.push_back(ConstantArrayGenerator::Zeroes(/*size=*/1, schema_));
-  }
-  auto dataset = std::make_shared<NestedParallelismDataset>(schema_, std::move(batches));
-  ScannerBuilder builder{dataset, options_};
-  ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish());
-
-  ASSERT_OK_AND_ASSIGN(auto output_dir, TemporaryDir::Make("nested-parallel-dataset"));
-
-  auto format = std::make_shared<DiscardingRowCountingFormat>();
-  auto rows_written = std::make_shared<std::atomic<int>>(0);
-  std::shared_ptr<FileWriteOptions> file_write_options =
-      std::make_shared<DiscardingRowCountingFileWriteOptions>(rows_written);
-  FileSystemDatasetWriteOptions dataset_write_options;
-  dataset_write_options.file_write_options = file_write_options;
-  dataset_write_options.basename_template = "{i}";
-  dataset_write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
-  dataset_write_options.base_dir = output_dir->path().ToString();
-  dataset_write_options.filesystem = std::make_shared<fs::LocalFileSystem>();
-
-  ASSERT_OK(FileSystemDataset::Write(dataset_write_options, scanner));
-  ASSERT_EQ(NUM_BATCHES, rows_written->load());
-}
-
 // Tests of subtree pruning
 
 struct TestPathTree {
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 738c9fc..52eebfe 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -64,12 +64,6 @@
   return MakeVectorIterator(record_batches_);
 }
 
-Result<RecordBatchGenerator> ScanTask::ExecuteAsync(internal::Executor*) {
-  return Status::NotImplemented("Async is not implemented for this scan task yet");
-}
-
-bool ScanTask::supports_async() const { return false; }
-
 Result<ScanTaskIterator> Scanner::Scan() {
   // TODO(ARROW-12289) This is overridden in SyncScanner and will never be implemented in
   // AsyncScanner.  It is deprecated and will eventually go away.
@@ -331,37 +325,22 @@
 
   // TODO (ARROW-11797) Migrate to using ScanBatches()
   size_t scan_task_id = 0;
-  std::vector<Future<>> scan_futures;
   for (auto maybe_scan_task : scan_task_it) {
     ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task);
 
     auto id = scan_task_id++;
-    if (scan_task->supports_async()) {
-      ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync(cpu_executor));
-      auto scan_fut = CollectAsyncGenerator(std::move(scan_gen))
-                          .Then([state, id](const RecordBatchVector& rbs) {
-                            state->Emplace(rbs, id);
-                          });
-      scan_futures.push_back(std::move(scan_fut));
-    } else {
-      task_group->Append([state, id, scan_task] {
-        ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
-        ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector());
-        state->Emplace(std::move(local), id);
-        return Status::OK();
-      });
-    }
+    task_group->Append([state, id, scan_task] {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+      ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector());
+      state->Emplace(std::move(local), id);
+      return Status::OK();
+    });
   }
   auto scan_options = scan_options_;
-  scan_futures.push_back(task_group->FinishAsync());
   // Wait for all tasks to complete, or the first error
-  return AllComplete(scan_futures)
-      .Then(
-          [scan_options, state](const detail::Empty&) -> Result<std::shared_ptr<Table>> {
-            return Table::FromRecordBatches(
-                scan_options->projected_schema,
-                FlattenRecordBatchVector(std::move(state->batches)));
-          });
+  RETURN_NOT_OK(task_group->Finish());
+  return Table::FromRecordBatches(scan_options->projected_schema,
+                                  FlattenRecordBatchVector(std::move(state->batches)));
 }
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 37765c1..c4da6da 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -152,8 +152,6 @@
   /// resulting from the Scan. Execution semantics are encapsulated in the
   /// particular ScanTask implementation
   virtual Result<RecordBatchIterator> Execute() = 0;
-  virtual Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor);
-  virtual bool supports_async() const;
 
   virtual ~ScanTask() = default;
 
diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h
index d334c09..292ea6c 100644
--- a/cpp/src/arrow/dataset/scanner_internal.h
+++ b/cpp/src/arrow/dataset/scanner_internal.h
@@ -38,8 +38,6 @@
 
 namespace dataset {
 
-// TODO(ARROW-7001) This synchronous version is no longer needed, can use async version
-// regardless of sync/async of source
 inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression filter,
                                              MemoryPool* pool) {
   return MakeMaybeMapIterator(
@@ -64,38 +62,6 @@
       std::move(it));
 }
 
-inline Result<std::shared_ptr<RecordBatch>> DoFilterRecordBatch(
-    const Expression& filter, MemoryPool* pool, const std::shared_ptr<RecordBatch>& in) {
-  compute::ExecContext exec_context{pool};
-  ARROW_ASSIGN_OR_RAISE(Datum mask,
-                        ExecuteScalarExpression(filter, Datum(in), &exec_context));
-
-  if (mask.is_scalar()) {
-    const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
-    if (mask_scalar.is_valid && mask_scalar.value) {
-      return std::move(in);
-    }
-    return in->Slice(0, 0);
-  }
-
-  ARROW_ASSIGN_OR_RAISE(
-      Datum filtered,
-      compute::Filter(in, mask, compute::FilterOptions::Defaults(), &exec_context));
-  return filtered.record_batch();
-}
-
-inline RecordBatchGenerator FilterRecordBatch(RecordBatchGenerator rbs, Expression filter,
-                                              MemoryPool* pool) {
-  // TODO(ARROW-7001) This changes to auto
-  std::function<Result<std::shared_ptr<RecordBatch>>(const std::shared_ptr<RecordBatch>&)>
-      mapper = [=](const std::shared_ptr<RecordBatch>& in) {
-        return DoFilterRecordBatch(filter, pool, in);
-      };
-  return MakeMappedGenerator(std::move(rbs), mapper);
-}
-
-// TODO(ARROW-7001) This synchronous version is no longer needed, all branches use async
-// version
 inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it,
                                               Expression projection, MemoryPool* pool) {
   return MakeMaybeMapIterator(
@@ -119,35 +85,6 @@
       std::move(it));
 }
 
-inline Result<std::shared_ptr<RecordBatch>> DoProjectRecordBatch(
-    const Expression& projection, MemoryPool* pool,
-    const std::shared_ptr<RecordBatch>& in) {
-  compute::ExecContext exec_context{pool};
-  ARROW_ASSIGN_OR_RAISE(Datum projected,
-                        ExecuteScalarExpression(projection, Datum(in), &exec_context));
-  DCHECK_EQ(projected.type()->id(), Type::STRUCT);
-  if (projected.shape() == ValueDescr::SCALAR) {
-    // Only virtual columns are projected. Broadcast to an array
-    ARROW_ASSIGN_OR_RAISE(projected,
-                          MakeArrayFromScalar(*projected.scalar(), in->num_rows(), pool));
-  }
-
-  ARROW_ASSIGN_OR_RAISE(auto out,
-                        RecordBatch::FromStructArray(projected.array_as<StructArray>()));
-
-  return out->ReplaceSchemaMetadata(in->schema()->metadata());
-}
-
-inline RecordBatchGenerator ProjectRecordBatch(RecordBatchGenerator rbs,
-                                               Expression projection, MemoryPool* pool) {
-  // TODO(ARROW-7001) This changes to auto
-  std::function<Result<std::shared_ptr<RecordBatch>>(const std::shared_ptr<RecordBatch>&)>
-      mapper = [=](const std::shared_ptr<RecordBatch>& in) {
-        return DoProjectRecordBatch(projection, pool, in);
-      };
-  return MakeMappedGenerator(std::move(rbs), mapper);
-}
-
 class FilterAndProjectScanTask : public ScanTask {
  public:
   explicit FilterAndProjectScanTask(std::shared_ptr<ScanTask> task, Expression partition)
@@ -155,9 +92,7 @@
         task_(std::move(task)),
         partition_(std::move(partition)) {}
 
-  bool supports_async() const override { return task_->supports_async(); }
-
-  Result<RecordBatchIterator> ExecuteSync() {
+  Result<RecordBatchIterator> Execute() override {
     ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute());
 
     ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
@@ -173,29 +108,6 @@
                               options_->pool);
   }
 
-  Result<RecordBatchIterator> Execute() override { return ExecuteSync(); }
-
-  Result<RecordBatchGenerator> ExecuteAsync(Executor* cpu_executor) override {
-    if (!task_->supports_async()) {
-      return Status::Invalid(
-          "ExecuteAsync should not have been called on FilterAndProjectScanTask if the "
-          "source task did not support async");
-    }
-    ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync(cpu_executor));
-
-    ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
-                          SimplifyWithGuarantee(options()->filter, partition_));
-
-    ARROW_ASSIGN_OR_RAISE(Expression simplified_projection,
-                          SimplifyWithGuarantee(options()->projection, partition_));
-
-    RecordBatchGenerator filter_gen =
-        FilterRecordBatch(std::move(gen), simplified_filter, options_->pool);
-
-    return ProjectRecordBatch(std::move(filter_gen), simplified_projection,
-                              options_->pool);
-  }
-
  private:
   std::shared_ptr<ScanTask> task_;
   Expression partition_;
diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc
index ccae126..d1d0e45 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -181,21 +181,6 @@
   AssertTablesEqual(*expected, *actual);
 }
 
-class TestScannerNestedParallelism : public NestedParallelismMixin {};
-
-TEST_F(TestScannerNestedParallelism, Scan) {
-  constexpr int NUM_BATCHES = 32;
-  RecordBatchVector batches;
-  for (int i = 0; i < NUM_BATCHES; i++) {
-    batches.push_back(ConstantArrayGenerator::Zeroes(/*size=*/1, schema_));
-  }
-  auto dataset = std::make_shared<NestedParallelismDataset>(schema_, std::move(batches));
-  ScannerBuilder builder{dataset, options_};
-  ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish());
-  ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
-  ASSERT_EQ(table->num_rows(), NUM_BATCHES);
-}
-
 class TestScannerBuilder : public ::testing::Test {
   void SetUp() override {
     DatasetVector sources;
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 826e8b7..ea4c41e 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -832,156 +832,5 @@
   std::shared_ptr<ScanOptions> scan_options_;
 };
 
-// These test cases will run on a thread pool with 1 thread.  Any illegal (non-async)
-// nested parallelism should deadlock the test
-class NestedParallelismMixin : public ::testing::Test {
- protected:
-  static void SetUpTestSuite() {}
-
-  void TearDown() override {
-    if (old_capacity_ > 0) {
-      ASSERT_OK(internal::GetCpuThreadPool()->SetCapacity(old_capacity_));
-    }
-  }
-
-  void SetUp() override {
-    old_capacity_ = internal::GetCpuThreadPool()->GetCapacity();
-    ASSERT_OK(internal::GetCpuThreadPool()->SetCapacity(1));
-    schema_ = schema({field("i32", int32())});
-    options_ = std::make_shared<ScanOptions>();
-    options_->dataset_schema = schema_;
-    options_->use_threads = true;
-  }
-
-  class NestedParallelismScanTask : public ScanTask {
-   public:
-    explicit NestedParallelismScanTask(std::shared_ptr<ScanTask> target)
-        : ScanTask(target->options(), target->fragment()), target_(std::move(target)) {}
-    virtual ~NestedParallelismScanTask() = default;
-
-    Result<RecordBatchIterator> Execute() override {
-      // We could just return an invalid status here but this way it is easy to verify the
-      // test is checking what it is supposed to be checking by just changing
-      // supports_async() to false (will deadlock)
-      ADD_FAILURE() << "NestedParallelismScanTask::Execute should never be called.  You "
-                       "should be deadlocked right now";
-      ARROW_ASSIGN_OR_RAISE(auto batch_gen, ExecuteAsync(internal::GetCpuThreadPool()));
-      return MakeGeneratorIterator(std::move(batch_gen));
-    }
-
-    Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor) override {
-      ARROW_ASSIGN_OR_RAISE(auto batches_it, target_->Execute());
-      ARROW_ASSIGN_OR_RAISE(auto batches, batches_it.ToVector());
-      auto generator_fut = DeferNotOk(
-          cpu_executor->Submit([batches] { return MakeVectorGenerator(batches); }));
-      return MakeFromFuture(generator_fut);
-    }
-
-    bool supports_async() const override { return true; }
-
-   private:
-    std::shared_ptr<ScanTask> target_;
-  };
-
-  class NestedParallelismFragment : public InMemoryFragment {
-   public:
-    explicit NestedParallelismFragment(RecordBatchVector record_batches,
-                                       Expression expr = literal(true))
-        : InMemoryFragment(std::move(record_batches), std::move(expr)) {}
-
-    Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
-      ARROW_ASSIGN_OR_RAISE(auto scan_task_it, InMemoryFragment::Scan(options));
-      return MakeMaybeMapIterator(
-          [](std::shared_ptr<ScanTask> task) -> Result<std::shared_ptr<ScanTask>> {
-            return std::make_shared<NestedParallelismScanTask>(std::move(task));
-          },
-          std::move(scan_task_it));
-    }
-  };
-
-  class NestedParallelismDataset : public InMemoryDataset {
-   public:
-    NestedParallelismDataset(std::shared_ptr<Schema> sch, RecordBatchVector batches)
-        : InMemoryDataset(std::move(sch), std::move(batches)) {}
-
-   protected:
-    Result<FragmentIterator> GetFragmentsImpl(Expression) override {
-      auto schema = this->schema();
-
-      auto create_fragment =
-          [schema](
-              std::shared_ptr<RecordBatch> batch) -> Result<std::shared_ptr<Fragment>> {
-        RecordBatchVector batches{batch};
-        return std::make_shared<NestedParallelismFragment>(std::move(batches));
-      };
-
-      return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get());
-    }
-  };
-
-  class DiscardingRowCountingFileWriteOptions : public FileWriteOptions {
-   public:
-    explicit DiscardingRowCountingFileWriteOptions(
-        std::shared_ptr<std::atomic<int>> row_counter)
-        : FileWriteOptions(
-              std::make_shared<DiscardingRowCountingFormat>(std::move(row_counter))) {}
-  };
-
-  class DiscardingRowCountingFileWriter : public FileWriter {
-   public:
-    explicit DiscardingRowCountingFileWriter(std::shared_ptr<std::atomic<int>> row_count)
-        : FileWriter(NULL, NULL, NULL), row_count_(std::move(row_count)) {}
-    virtual ~DiscardingRowCountingFileWriter() = default;
-
-    Status Write(const std::shared_ptr<RecordBatch>& batch) override {
-      row_count_->fetch_add(static_cast<int>(batch->num_rows()));
-      return Status::OK();
-    }
-    Status Finish() override { return Status::OK(); };
-
-   protected:
-    Status FinishInternal() override { return Status::OK(); };
-
-   private:
-    std::shared_ptr<std::atomic<int>> row_count_;
-  };
-
-  class DiscardingRowCountingFormat : public FileFormat {
-   public:
-    DiscardingRowCountingFormat() : row_count_(std::make_shared<std::atomic<int>>(0)) {}
-    explicit DiscardingRowCountingFormat(std::shared_ptr<std::atomic<int>> row_count)
-        : row_count_(std::move(row_count)) {}
-    virtual ~DiscardingRowCountingFormat() = default;
-
-    std::string type_name() const override { return "discarding-row-counting"; }
-    bool Equals(const FileFormat& other) const override { return true; }
-    Result<bool> IsSupported(const FileSource& source) const override {
-      return Status::NotImplemented("Should not be called");
-    }
-    Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
-      return Status::NotImplemented("Should not be called");
-    }
-    Result<ScanTaskIterator> ScanFile(
-        std::shared_ptr<ScanOptions> options,
-        const std::shared_ptr<FileFragment>& file) const override {
-      return Status::NotImplemented("Should not be called");
-    }
-    Result<std::shared_ptr<FileWriter>> MakeWriter(
-        std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
-        std::shared_ptr<FileWriteOptions> options) const override {
-      return std::make_shared<DiscardingRowCountingFileWriter>(row_count_);
-    }
-    std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return NULLPTR; }
-
-   private:
-    std::shared_ptr<std::atomic<int>> row_count_;
-  };
-
- protected:
-  int old_capacity_ = 0;
-  std::shared_ptr<Schema> schema_;
-  std::shared_ptr<ScanOptions> options_;
-};
-
 }  // namespace dataset
 }  // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index e45f598..4650e80 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -25,7 +25,6 @@
 #include "arrow/result.h"
 #include "arrow/status.h"
 #include "arrow/type_fwd.h"
-#include "arrow/util/future.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
@@ -208,14 +207,6 @@
   /// \return Status
   virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
 
-  // Fallback to sync implementation until all other readers are converted(ARROW-11770)
-  // and then this could become pure virtual with ReadNext falling back to async impl.
-  virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() {
-    std::shared_ptr<RecordBatch> batch;
-    ARROW_RETURN_NOT_OK(ReadNext(&batch));
-    return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(batch));
-  }
-
   /// \brief Iterator interface
   Result<std::shared_ptr<RecordBatch>> Next() {
     std::shared_ptr<RecordBatch> batch;
diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index 21754ec..4c8de91 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -718,7 +718,7 @@
         return true;
       }
       if (control_res->has_value()) {
-        break_fut.MarkFinished(*std::move(*control_res));
+        break_fut.MarkFinished(**control_res);
         return true;
       }
       return false;
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index c388680..fc7dc85 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -106,9 +106,8 @@
   Future<T> Transfer(Future<T> future) {
     auto transferred = Future<T>::Make();
     auto callback = [this, transferred](const Result<T>& result) mutable {
-      auto spawn_status = Spawn([transferred, result]() mutable {
-        transferred.MarkFinished(std::move(result));
-      });
+      auto spawn_status =
+          Spawn([transferred, result]() mutable { transferred.MarkFinished(result); });
       if (!spawn_status.ok()) {
         transferred.MarkFinished(spawn_status);
       }
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index 192b4b4..9943292 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -354,7 +354,6 @@
 })
 
 test_that("CSV dataset", {
-  skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-12181
   ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
   expect_r6_class(ds$format, "CsvFileFormat")
   expect_r6_class(ds$filesystem, "LocalFileSystem")