ARROW-12161: [C++][Dataset] Revert async CSV reader in datasets
Reverts the streaming CSV reader and the async workaround introduced for it. It will be reintroduced, more cleanly, in ARROW-12355
Closes #10019 from westonpace/feature/revert-arrow-12161
Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index c435236..5b6e11e 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -199,19 +199,6 @@
return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn);
}
- static AsyncGenerator<CSVBlock> MakeAsyncIterator(
- AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
- std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer) {
- auto block_reader =
- std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer);
- // Wrap shared pointer in callable
- Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
- [block_reader](std::shared_ptr<Buffer> next) {
- return (*block_reader)(std::move(next));
- };
- return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn);
- }
-
Result<TransformFlow<CSVBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
if (buffer_ == nullptr) {
return TransformFinish();
@@ -585,25 +572,22 @@
class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
public:
- BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor,
- std::shared_ptr<io::InputStream> input,
- const ReadOptions& read_options, const ParseOptions& parse_options,
- const ConvertOptions& convert_options)
- : ReaderMixin(io_context, std::move(input), read_options, parse_options,
- convert_options),
- cpu_executor_(cpu_executor) {}
+ using ReaderMixin::ReaderMixin;
- virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0;
+ virtual Status Init() = 0;
std::shared_ptr<Schema> schema() const override { return schema_; }
Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
- auto next_fut = ReadNextAsync();
- auto next_result = next_fut.result();
- return std::move(next_result).Value(batch);
+ do {
+ RETURN_NOT_OK(ReadNext().Value(batch));
+ } while (*batch != nullptr && (*batch)->num_rows() == 0);
+ return Status::OK();
}
protected:
+ virtual Result<std::shared_ptr<RecordBatch>> ReadNext() = 0;
+
// Make column decoders from conversion schema
Status MakeColumnDecoders() {
for (const auto& column : conversion_schema_.columns) {
@@ -686,141 +670,101 @@
std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<RecordBatch> pending_batch_;
- AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
- Executor* cpu_executor_;
+ Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
bool eof_ = false;
};
/////////////////////////////////////////////////////////////////////////
// Serial StreamingReader implementation
-class SerialStreamingReader : public BaseStreamingReader,
- public std::enable_shared_from_this<SerialStreamingReader> {
+class SerialStreamingReader : public BaseStreamingReader {
public:
using BaseStreamingReader::BaseStreamingReader;
- Future<std::shared_ptr<csv::StreamingReader>> Init() override {
+ Status Init() override {
ARROW_ASSIGN_OR_RAISE(auto istream_it,
io::MakeInputStreamIterator(input_, read_options_.block_size));
- // TODO Consider exposing readahead as a read option (ARROW-12090)
- ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
- io_context_.executor()));
-
- auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
-
- buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it));
+ // Since we're converting serially, no need to readahead more than one block
+ int32_t block_queue_size = 1;
+ ARROW_ASSIGN_OR_RAISE(auto rh_it,
+ MakeReadaheadIterator(std::move(istream_it), block_queue_size));
+ buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
- auto self = shared_from_this();
// Read schema from first batch
- return ReadNextAsync().Then([self](const std::shared_ptr<RecordBatch>& first_batch)
- -> Result<std::shared_ptr<csv::StreamingReader>> {
- self->pending_batch_ = first_batch;
- DCHECK_NE(self->schema_, nullptr);
- return self;
- });
+ ARROW_ASSIGN_OR_RAISE(pending_batch_, ReadNext());
+ DCHECK_NE(schema_, nullptr);
+ return Status::OK();
}
- Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
- auto maybe_batch = DecodeNextBatch();
- if (schema_ == nullptr && maybe_batch.ok()) {
- schema_ = (*maybe_batch)->schema();
- }
- return maybe_batch;
- }
-
- Future<std::shared_ptr<RecordBatch>> DoReadNext(
- std::shared_ptr<SerialStreamingReader> self) {
- auto batch = std::move(pending_batch_);
- if (batch != nullptr) {
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
- }
-
- if (!source_eof_) {
- return block_generator_()
- .Then([self](const CSVBlock& maybe_block) -> Status {
- if (!IsIterationEnd(maybe_block)) {
- self->last_block_index_ = maybe_block.block_index;
- auto maybe_parsed = self->ParseAndInsert(
- maybe_block.partial, maybe_block.completion, maybe_block.buffer,
- maybe_block.block_index, maybe_block.is_final);
- if (!maybe_parsed.ok()) {
- // Parse error => bail out
- self->eof_ = true;
- return maybe_parsed.status();
- }
- RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
- } else {
- self->source_eof_ = true;
- for (auto& decoder : self->column_decoders_) {
- decoder->SetEOF(self->last_block_index_ + 1);
- }
- }
- return Status::OK();
- })
- .Then([self](const ::arrow::detail::Empty& st)
- -> Result<std::shared_ptr<RecordBatch>> {
- return self->DecodeBatchAndUpdateSchema();
- });
- }
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
- DecodeBatchAndUpdateSchema());
- }
-
- Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty(
- std::shared_ptr<SerialStreamingReader> self) {
- return DoReadNext(self).Then([self](const std::shared_ptr<RecordBatch>& batch) {
- if (batch != nullptr && batch->num_rows() == 0) {
- return self->ReadNextSkippingEmpty(self);
- }
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
- });
- }
-
- Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
+ protected:
+ Result<std::shared_ptr<RecordBatch>> ReadNext() override {
if (eof_) {
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(nullptr);
+ return nullptr;
}
if (io_context_.stop_token().IsStopRequested()) {
eof_ = true;
return io_context_.stop_token().Poll();
}
- auto self = shared_from_this();
- if (!block_generator_) {
- return SetupReader(self).Then([self](const Result<::arrow::detail::Empty>& res)
- -> Future<std::shared_ptr<RecordBatch>> {
- if (!res.ok()) {
- self->eof_ = true;
- return res.status();
- }
- return self->ReadNextSkippingEmpty(self);
- });
- } else {
- return self->ReadNextSkippingEmpty(self);
+ if (!block_iterator_) {
+ Status st = SetupReader();
+ if (!st.ok()) {
+ // Can't setup reader => bail out
+ eof_ = true;
+ return st;
+ }
}
+ auto batch = std::move(pending_batch_);
+ if (batch != nullptr) {
+ return batch;
+ }
+
+ if (!source_eof_) {
+ ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator_.Next());
+ if (!IsIterationEnd(maybe_block)) {
+ last_block_index_ = maybe_block.block_index;
+ auto maybe_parsed = ParseAndInsert(maybe_block.partial, maybe_block.completion,
+ maybe_block.buffer, maybe_block.block_index,
+ maybe_block.is_final);
+ if (!maybe_parsed.ok()) {
+ // Parse error => bail out
+ eof_ = true;
+ return maybe_parsed.status();
+ }
+ RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
+ } else {
+ source_eof_ = true;
+ for (auto& decoder : column_decoders_) {
+ decoder->SetEOF(last_block_index_ + 1);
+ }
+ }
+ }
+
+ auto maybe_batch = DecodeNextBatch();
+ if (schema_ == nullptr && maybe_batch.ok()) {
+ schema_ = (*maybe_batch)->schema();
+ }
+ return maybe_batch;
};
- protected:
- Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) {
- return buffer_generator_().Then([self](const std::shared_ptr<Buffer>& first_buffer) {
- if (first_buffer == nullptr) {
- return Status::Invalid("Empty CSV file");
- }
- auto own_first_buffer = first_buffer;
- RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer));
- RETURN_NOT_OK(self->MakeColumnDecoders());
+ Status SetupReader() {
+ ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next());
+ if (first_buffer == nullptr) {
+ return Status::Invalid("Empty CSV file");
+ }
+ RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer));
+ RETURN_NOT_OK(MakeColumnDecoders());
- self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
- std::move(self->buffer_generator_), MakeChunker(self->parse_options_),
- std::move(own_first_buffer));
- return Status::OK();
- });
+ block_iterator_ = SerialBlockReader::MakeIterator(std::move(buffer_iterator_),
+ MakeChunker(parse_options_),
+ std::move(first_buffer));
+ return Status::OK();
}
bool source_eof_ = false;
int64_t last_block_index_ = 0;
- AsyncGenerator<CSVBlock> block_generator_;
+ Iterator<CSVBlock> block_iterator_;
};
/////////////////////////////////////////////////////////////////////////
@@ -999,14 +943,15 @@
return reader;
}
-Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
+Result<std::shared_ptr<StreamingReader>> MakeStreamingReader(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
internal::Executor* cpu_executor, const ReadOptions& read_options,
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
std::shared_ptr<BaseStreamingReader> reader;
- reader = std::make_shared<SerialStreamingReader>(
- io_context, cpu_executor, input, read_options, parse_options, convert_options);
- return reader->Init();
+ reader = std::make_shared<SerialStreamingReader>(io_context, input, read_options,
+ parse_options, convert_options);
+ RETURN_NOT_OK(reader->Init());
+ return reader;
}
} // namespace
@@ -1036,11 +981,8 @@
const ConvertOptions& convert_options) {
auto io_context = io::IOContext(pool);
auto cpu_executor = internal::GetCpuThreadPool();
- auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
- read_options, parse_options, convert_options);
- auto reader_result = reader_fut.result();
- ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
- return reader;
+ return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
+ parse_options, convert_options);
}
Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
@@ -1048,17 +990,6 @@
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
auto cpu_executor = internal::GetCpuThreadPool();
- auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
- read_options, parse_options, convert_options);
- auto reader_result = reader_fut.result();
- ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
- return reader;
-}
-
-Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
- io::IOContext io_context, std::shared_ptr<io::InputStream> input,
- internal::Executor* cpu_executor, const ReadOptions& read_options,
- const ParseOptions& parse_options, const ConvertOptions& convert_options) {
return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
parse_options, convert_options);
}
diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h
index 72f1375..8e56824 100644
--- a/cpp/src/arrow/csv/reader.h
+++ b/cpp/src/arrow/csv/reader.h
@@ -65,17 +65,6 @@
virtual ~StreamingReader() = default;
/// Create a StreamingReader instance
- ///
- /// This involves some I/O as the first batch must be loaded during the creation process
- /// so it is returned as a future
- ///
- /// Currently, the StreamingReader is not async-reentrant and does not do any fan-out
- /// parsing (see ARROW-11889)
- static Future<std::shared_ptr<StreamingReader>> MakeAsync(
- io::IOContext io_context, std::shared_ptr<io::InputStream> input,
- internal::Executor* cpu_executor, const ReadOptions&, const ParseOptions&,
- const ConvertOptions&);
-
static Result<std::shared_ptr<StreamingReader>> Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&, const ConvertOptions&);
diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc
index 228ab71..dbe6b1d 100644
--- a/cpp/src/arrow/csv/reader_test.cc
+++ b/cpp/src/arrow/csv/reader_test.cc
@@ -32,14 +32,10 @@
#include "arrow/table.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
-#include "arrow/util/async_generator.h"
#include "arrow/util/future.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
-
-using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;
-
namespace csv {
// Allows the streaming reader to be used in tests that expect a table reader
@@ -49,17 +45,12 @@
: reader_(std::move(reader)) {}
virtual ~StreamingReaderAsTableReader() = default;
virtual Result<std::shared_ptr<Table>> Read() {
- auto table_fut = ReadAsync();
- auto table_res = table_fut.result();
- ARROW_ASSIGN_OR_RAISE(auto table, table_res);
+ std::shared_ptr<Table> table;
+ RETURN_NOT_OK(reader_->ReadAll(&table));
return table;
}
virtual Future<std::shared_ptr<Table>> ReadAsync() {
- auto reader = reader_;
- RecordBatchGenerator rb_generator = [reader]() { return reader->ReadNextAsync(); };
- return CollectAsyncGenerator(rb_generator).Then([](const RecordBatchVector& rbs) {
- return Table::FromRecordBatches(rbs);
- });
+ return Future<std::shared_ptr<Table>>::MakeFinished(Read());
}
private:
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 7b2f420..f4a3a0b 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -428,25 +428,16 @@
auto task_group = scan_options.TaskGroup();
for (const auto& scan_task : scan_tasks) {
- if (scan_task->supports_async()) {
- ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync(cpu_executor));
- std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
- [&, scan_task](std::shared_ptr<RecordBatch> batch) {
- return WriteNextBatch(state, scan_task->fragment(), std::move(batch));
- };
- scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
- } else {
- task_group->Append([&, scan_task] {
- ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());
+ task_group->Append([&, scan_task] {
+ ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());
- for (auto maybe_batch : batches) {
- ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
- RETURN_NOT_OK(WriteNextBatch(state, scan_task->fragment(), std::move(batch)));
- }
+ for (auto maybe_batch : batches) {
+ ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+ RETURN_NOT_OK(WriteNextBatch(state, scan_task->fragment(), std::move(batch)));
+ }
- return Status::OK();
- });
- }
+ return Status::OK();
+ });
}
scan_futs.push_back(task_group->FinishAsync());
return AllComplete(scan_futs);
diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc
index 9a7a9d2..8ba6505 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -34,7 +34,6 @@
#include "arrow/io/compressed.h"
#include "arrow/result.h"
#include "arrow/type.h"
-#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
@@ -114,53 +113,34 @@
return read_options;
}
-static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
- const FileSource& source, const CsvFileFormat& format,
- internal::Executor* cpu_executor,
- const std::shared_ptr<ScanOptions>& scan_options = nullptr,
- MemoryPool* pool = default_memory_pool()) {
- ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
-
- ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
- ARROW_ASSIGN_OR_RAISE(
- input, io::BufferedInputStream::Create(reader_options.block_size,
- default_memory_pool(), std::move(input)));
-
- auto peek_fut = DeferNotOk(input->io_context().executor()->Submit(
- [input, reader_options] { return input->Peek(reader_options.block_size); }));
-
- return peek_fut.Then([=](const util::string_view& first_block)
- -> Future<std::shared_ptr<csv::StreamingReader>> {
- const auto& parse_options = format.parse_options;
- auto convert_options = csv::ConvertOptions::Defaults();
- if (scan_options != nullptr) {
- ARROW_ASSIGN_OR_RAISE(convert_options,
- GetConvertOptions(format, scan_options, first_block, pool));
- }
-
- return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input),
- cpu_executor, reader_options, parse_options,
- convert_options)
- .Then(
- [](const std::shared_ptr<csv::StreamingReader>& maybe_reader)
- -> Result<std::shared_ptr<csv::StreamingReader>> { return maybe_reader; },
- [source](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
- return err.WithMessage("Could not open CSV input source '", source.path(),
- "': ", err);
- });
- });
-}
-
static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
- bool use_threads = (scan_options != nullptr && scan_options->use_threads);
- return internal::RunSynchronously<std::shared_ptr<csv::StreamingReader>>(
- [&](Executor* executor) {
- return OpenReaderAsync(source, format, executor, scan_options, pool);
- },
- use_threads);
+ ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
+
+ util::string_view first_block;
+ ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
+ ARROW_ASSIGN_OR_RAISE(
+ input, io::BufferedInputStream::Create(reader_options.block_size,
+ default_memory_pool(), std::move(input)));
+ ARROW_ASSIGN_OR_RAISE(first_block, input->Peek(reader_options.block_size));
+
+ const auto& parse_options = format.parse_options;
+ auto convert_options = csv::ConvertOptions::Defaults();
+ if (scan_options != nullptr) {
+ ARROW_ASSIGN_OR_RAISE(convert_options,
+ GetConvertOptions(format, scan_options, first_block, pool));
+ }
+
+ auto maybe_reader =
+ csv::StreamingReader::Make(io::IOContext(pool), std::move(input), reader_options,
+ parse_options, convert_options);
+ if (!maybe_reader.ok()) {
+ return maybe_reader.status().WithMessage("Could not open CSV input source '",
+ source.path(), "': ", maybe_reader.status());
+ }
+ return maybe_reader;
}
/// \brief A ScanTask backed by an Csv file.
@@ -174,20 +154,9 @@
source_(fragment->source()) {}
Result<RecordBatchIterator> Execute() override {
- ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync(internal::GetCpuThreadPool()));
- return MakeGeneratorIterator(std::move(gen));
- }
-
- bool supports_async() const override { return true; }
-
- Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor) override {
- auto reader_fut =
- OpenReaderAsync(source_, *format_, cpu_executor, options(), options()->pool);
- auto generator_fut = reader_fut.Then(
- [](const std::shared_ptr<csv::StreamingReader>& reader) -> RecordBatchGenerator {
- return [reader]() { return reader->ReadNextAsync(); };
- });
- return MakeFromFuture(generator_fut);
+ ARROW_ASSIGN_OR_RAISE(auto reader,
+ OpenReader(source_, *format_, options(), options()->pool));
+ return IteratorFromReader(std::move(reader));
}
private:
@@ -225,8 +194,8 @@
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<FileFragment>& fragment) const {
auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
- auto task = std::make_shared<CsvScanTask>(std::move(this_), std::move(options),
- std::move(fragment));
+ auto task =
+ std::make_shared<CsvScanTask>(std::move(this_), std::move(options), fragment);
return MakeVectorIterator<std::shared_ptr<ScanTask>>({std::move(task)});
}
diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc
index fdbb451..c7ce515 100644
--- a/cpp/src/arrow/dataset/file_test.cc
+++ b/cpp/src/arrow/dataset/file_test.cc
@@ -249,35 +249,6 @@
});
}
-class TestFilesystemDatasetNestedParallelism : public NestedParallelismMixin {};
-
-TEST_F(TestFilesystemDatasetNestedParallelism, Write) {
- constexpr int NUM_BATCHES = 32;
- RecordBatchVector batches;
- for (int i = 0; i < NUM_BATCHES; i++) {
- batches.push_back(ConstantArrayGenerator::Zeroes(/*size=*/1, schema_));
- }
- auto dataset = std::make_shared<NestedParallelismDataset>(schema_, std::move(batches));
- ScannerBuilder builder{dataset, options_};
- ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish());
-
- ASSERT_OK_AND_ASSIGN(auto output_dir, TemporaryDir::Make("nested-parallel-dataset"));
-
- auto format = std::make_shared<DiscardingRowCountingFormat>();
- auto rows_written = std::make_shared<std::atomic<int>>(0);
- std::shared_ptr<FileWriteOptions> file_write_options =
- std::make_shared<DiscardingRowCountingFileWriteOptions>(rows_written);
- FileSystemDatasetWriteOptions dataset_write_options;
- dataset_write_options.file_write_options = file_write_options;
- dataset_write_options.basename_template = "{i}";
- dataset_write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
- dataset_write_options.base_dir = output_dir->path().ToString();
- dataset_write_options.filesystem = std::make_shared<fs::LocalFileSystem>();
-
- ASSERT_OK(FileSystemDataset::Write(dataset_write_options, scanner));
- ASSERT_EQ(NUM_BATCHES, rows_written->load());
-}
-
// Tests of subtree pruning
struct TestPathTree {
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 738c9fc..52eebfe 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -64,12 +64,6 @@
return MakeVectorIterator(record_batches_);
}
-Result<RecordBatchGenerator> ScanTask::ExecuteAsync(internal::Executor*) {
- return Status::NotImplemented("Async is not implemented for this scan task yet");
-}
-
-bool ScanTask::supports_async() const { return false; }
-
Result<ScanTaskIterator> Scanner::Scan() {
// TODO(ARROW-12289) This is overridden in SyncScanner and will never be implemented in
// AsyncScanner. It is deprecated and will eventually go away.
@@ -331,37 +325,22 @@
// TODO (ARROW-11797) Migrate to using ScanBatches()
size_t scan_task_id = 0;
- std::vector<Future<>> scan_futures;
for (auto maybe_scan_task : scan_task_it) {
ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task);
auto id = scan_task_id++;
- if (scan_task->supports_async()) {
- ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync(cpu_executor));
- auto scan_fut = CollectAsyncGenerator(std::move(scan_gen))
- .Then([state, id](const RecordBatchVector& rbs) {
- state->Emplace(rbs, id);
- });
- scan_futures.push_back(std::move(scan_fut));
- } else {
- task_group->Append([state, id, scan_task] {
- ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
- ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector());
- state->Emplace(std::move(local), id);
- return Status::OK();
- });
- }
+ task_group->Append([state, id, scan_task] {
+ ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+ ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector());
+ state->Emplace(std::move(local), id);
+ return Status::OK();
+ });
}
auto scan_options = scan_options_;
- scan_futures.push_back(task_group->FinishAsync());
// Wait for all tasks to complete, or the first error
- return AllComplete(scan_futures)
- .Then(
- [scan_options, state](const detail::Empty&) -> Result<std::shared_ptr<Table>> {
- return Table::FromRecordBatches(
- scan_options->projected_schema,
- FlattenRecordBatchVector(std::move(state->batches)));
- });
+ RETURN_NOT_OK(task_group->Finish());
+ return Table::FromRecordBatches(scan_options->projected_schema,
+ FlattenRecordBatchVector(std::move(state->batches)));
}
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 37765c1..c4da6da 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -152,8 +152,6 @@
/// resulting from the Scan. Execution semantics are encapsulated in the
/// particular ScanTask implementation
virtual Result<RecordBatchIterator> Execute() = 0;
- virtual Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor);
- virtual bool supports_async() const;
virtual ~ScanTask() = default;
diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h
index d334c09..292ea6c 100644
--- a/cpp/src/arrow/dataset/scanner_internal.h
+++ b/cpp/src/arrow/dataset/scanner_internal.h
@@ -38,8 +38,6 @@
namespace dataset {
-// TODO(ARROW-7001) This synchronous version is no longer needed, can use async version
-// regardless of sync/async of source
inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression filter,
MemoryPool* pool) {
return MakeMaybeMapIterator(
@@ -64,38 +62,6 @@
std::move(it));
}
-inline Result<std::shared_ptr<RecordBatch>> DoFilterRecordBatch(
- const Expression& filter, MemoryPool* pool, const std::shared_ptr<RecordBatch>& in) {
- compute::ExecContext exec_context{pool};
- ARROW_ASSIGN_OR_RAISE(Datum mask,
- ExecuteScalarExpression(filter, Datum(in), &exec_context));
-
- if (mask.is_scalar()) {
- const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
- if (mask_scalar.is_valid && mask_scalar.value) {
- return std::move(in);
- }
- return in->Slice(0, 0);
- }
-
- ARROW_ASSIGN_OR_RAISE(
- Datum filtered,
- compute::Filter(in, mask, compute::FilterOptions::Defaults(), &exec_context));
- return filtered.record_batch();
-}
-
-inline RecordBatchGenerator FilterRecordBatch(RecordBatchGenerator rbs, Expression filter,
- MemoryPool* pool) {
- // TODO(ARROW-7001) This changes to auto
- std::function<Result<std::shared_ptr<RecordBatch>>(const std::shared_ptr<RecordBatch>&)>
- mapper = [=](const std::shared_ptr<RecordBatch>& in) {
- return DoFilterRecordBatch(filter, pool, in);
- };
- return MakeMappedGenerator(std::move(rbs), mapper);
-}
-
-// TODO(ARROW-7001) This synchronous version is no longer needed, all branches use async
-// version
inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it,
Expression projection, MemoryPool* pool) {
return MakeMaybeMapIterator(
@@ -119,35 +85,6 @@
std::move(it));
}
-inline Result<std::shared_ptr<RecordBatch>> DoProjectRecordBatch(
- const Expression& projection, MemoryPool* pool,
- const std::shared_ptr<RecordBatch>& in) {
- compute::ExecContext exec_context{pool};
- ARROW_ASSIGN_OR_RAISE(Datum projected,
- ExecuteScalarExpression(projection, Datum(in), &exec_context));
- DCHECK_EQ(projected.type()->id(), Type::STRUCT);
- if (projected.shape() == ValueDescr::SCALAR) {
- // Only virtual columns are projected. Broadcast to an array
- ARROW_ASSIGN_OR_RAISE(projected,
- MakeArrayFromScalar(*projected.scalar(), in->num_rows(), pool));
- }
-
- ARROW_ASSIGN_OR_RAISE(auto out,
- RecordBatch::FromStructArray(projected.array_as<StructArray>()));
-
- return out->ReplaceSchemaMetadata(in->schema()->metadata());
-}
-
-inline RecordBatchGenerator ProjectRecordBatch(RecordBatchGenerator rbs,
- Expression projection, MemoryPool* pool) {
- // TODO(ARROW-7001) This changes to auto
- std::function<Result<std::shared_ptr<RecordBatch>>(const std::shared_ptr<RecordBatch>&)>
- mapper = [=](const std::shared_ptr<RecordBatch>& in) {
- return DoProjectRecordBatch(projection, pool, in);
- };
- return MakeMappedGenerator(std::move(rbs), mapper);
-}
-
class FilterAndProjectScanTask : public ScanTask {
public:
explicit FilterAndProjectScanTask(std::shared_ptr<ScanTask> task, Expression partition)
@@ -155,9 +92,7 @@
task_(std::move(task)),
partition_(std::move(partition)) {}
- bool supports_async() const override { return task_->supports_async(); }
-
- Result<RecordBatchIterator> ExecuteSync() {
+ Result<RecordBatchIterator> Execute() override {
ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute());
ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
@@ -173,29 +108,6 @@
options_->pool);
}
- Result<RecordBatchIterator> Execute() override { return ExecuteSync(); }
-
- Result<RecordBatchGenerator> ExecuteAsync(Executor* cpu_executor) override {
- if (!task_->supports_async()) {
- return Status::Invalid(
- "ExecuteAsync should not have been called on FilterAndProjectScanTask if the "
- "source task did not support async");
- }
- ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync(cpu_executor));
-
- ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
- SimplifyWithGuarantee(options()->filter, partition_));
-
- ARROW_ASSIGN_OR_RAISE(Expression simplified_projection,
- SimplifyWithGuarantee(options()->projection, partition_));
-
- RecordBatchGenerator filter_gen =
- FilterRecordBatch(std::move(gen), simplified_filter, options_->pool);
-
- return ProjectRecordBatch(std::move(filter_gen), simplified_projection,
- options_->pool);
- }
-
private:
std::shared_ptr<ScanTask> task_;
Expression partition_;
diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc
index ccae126..d1d0e45 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -181,21 +181,6 @@
AssertTablesEqual(*expected, *actual);
}
-class TestScannerNestedParallelism : public NestedParallelismMixin {};
-
-TEST_F(TestScannerNestedParallelism, Scan) {
- constexpr int NUM_BATCHES = 32;
- RecordBatchVector batches;
- for (int i = 0; i < NUM_BATCHES; i++) {
- batches.push_back(ConstantArrayGenerator::Zeroes(/*size=*/1, schema_));
- }
- auto dataset = std::make_shared<NestedParallelismDataset>(schema_, std::move(batches));
- ScannerBuilder builder{dataset, options_};
- ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish());
- ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
- ASSERT_EQ(table->num_rows(), NUM_BATCHES);
-}
-
class TestScannerBuilder : public ::testing::Test {
void SetUp() override {
DatasetVector sources;
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 826e8b7..ea4c41e 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -832,156 +832,5 @@
std::shared_ptr<ScanOptions> scan_options_;
};
-// These test cases will run on a thread pool with 1 thread. Any illegal (non-async)
-// nested parallelism should deadlock the test
-class NestedParallelismMixin : public ::testing::Test {
- protected:
- static void SetUpTestSuite() {}
-
- void TearDown() override {
- if (old_capacity_ > 0) {
- ASSERT_OK(internal::GetCpuThreadPool()->SetCapacity(old_capacity_));
- }
- }
-
- void SetUp() override {
- old_capacity_ = internal::GetCpuThreadPool()->GetCapacity();
- ASSERT_OK(internal::GetCpuThreadPool()->SetCapacity(1));
- schema_ = schema({field("i32", int32())});
- options_ = std::make_shared<ScanOptions>();
- options_->dataset_schema = schema_;
- options_->use_threads = true;
- }
-
- class NestedParallelismScanTask : public ScanTask {
- public:
- explicit NestedParallelismScanTask(std::shared_ptr<ScanTask> target)
- : ScanTask(target->options(), target->fragment()), target_(std::move(target)) {}
- virtual ~NestedParallelismScanTask() = default;
-
- Result<RecordBatchIterator> Execute() override {
- // We could just return an invalid status here but this way it is easy to verify the
- // test is checking what it is supposed to be checking by just changing
- // supports_async() to false (will deadlock)
- ADD_FAILURE() << "NestedParallelismScanTask::Execute should never be called. You "
- "should be deadlocked right now";
- ARROW_ASSIGN_OR_RAISE(auto batch_gen, ExecuteAsync(internal::GetCpuThreadPool()));
- return MakeGeneratorIterator(std::move(batch_gen));
- }
-
- Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor) override {
- ARROW_ASSIGN_OR_RAISE(auto batches_it, target_->Execute());
- ARROW_ASSIGN_OR_RAISE(auto batches, batches_it.ToVector());
- auto generator_fut = DeferNotOk(
- cpu_executor->Submit([batches] { return MakeVectorGenerator(batches); }));
- return MakeFromFuture(generator_fut);
- }
-
- bool supports_async() const override { return true; }
-
- private:
- std::shared_ptr<ScanTask> target_;
- };
-
- class NestedParallelismFragment : public InMemoryFragment {
- public:
- explicit NestedParallelismFragment(RecordBatchVector record_batches,
- Expression expr = literal(true))
- : InMemoryFragment(std::move(record_batches), std::move(expr)) {}
-
- Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
- ARROW_ASSIGN_OR_RAISE(auto scan_task_it, InMemoryFragment::Scan(options));
- return MakeMaybeMapIterator(
- [](std::shared_ptr<ScanTask> task) -> Result<std::shared_ptr<ScanTask>> {
- return std::make_shared<NestedParallelismScanTask>(std::move(task));
- },
- std::move(scan_task_it));
- }
- };
-
- class NestedParallelismDataset : public InMemoryDataset {
- public:
- NestedParallelismDataset(std::shared_ptr<Schema> sch, RecordBatchVector batches)
- : InMemoryDataset(std::move(sch), std::move(batches)) {}
-
- protected:
- Result<FragmentIterator> GetFragmentsImpl(Expression) override {
- auto schema = this->schema();
-
- auto create_fragment =
- [schema](
- std::shared_ptr<RecordBatch> batch) -> Result<std::shared_ptr<Fragment>> {
- RecordBatchVector batches{batch};
- return std::make_shared<NestedParallelismFragment>(std::move(batches));
- };
-
- return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get());
- }
- };
-
- class DiscardingRowCountingFileWriteOptions : public FileWriteOptions {
- public:
- explicit DiscardingRowCountingFileWriteOptions(
- std::shared_ptr<std::atomic<int>> row_counter)
- : FileWriteOptions(
- std::make_shared<DiscardingRowCountingFormat>(std::move(row_counter))) {}
- };
-
- class DiscardingRowCountingFileWriter : public FileWriter {
- public:
- explicit DiscardingRowCountingFileWriter(std::shared_ptr<std::atomic<int>> row_count)
- : FileWriter(NULL, NULL, NULL), row_count_(std::move(row_count)) {}
- virtual ~DiscardingRowCountingFileWriter() = default;
-
- Status Write(const std::shared_ptr<RecordBatch>& batch) override {
- row_count_->fetch_add(static_cast<int>(batch->num_rows()));
- return Status::OK();
- }
- Status Finish() override { return Status::OK(); };
-
- protected:
- Status FinishInternal() override { return Status::OK(); };
-
- private:
- std::shared_ptr<std::atomic<int>> row_count_;
- };
-
- class DiscardingRowCountingFormat : public FileFormat {
- public:
- DiscardingRowCountingFormat() : row_count_(std::make_shared<std::atomic<int>>(0)) {}
- explicit DiscardingRowCountingFormat(std::shared_ptr<std::atomic<int>> row_count)
- : row_count_(std::move(row_count)) {}
- virtual ~DiscardingRowCountingFormat() = default;
-
- std::string type_name() const override { return "discarding-row-counting"; }
- bool Equals(const FileFormat& other) const override { return true; }
- Result<bool> IsSupported(const FileSource& source) const override {
- return Status::NotImplemented("Should not be called");
- }
- Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
- return Status::NotImplemented("Should not be called");
- }
- Result<ScanTaskIterator> ScanFile(
- std::shared_ptr<ScanOptions> options,
- const std::shared_ptr<FileFragment>& file) const override {
- return Status::NotImplemented("Should not be called");
- }
- Result<std::shared_ptr<FileWriter>> MakeWriter(
- std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
- std::shared_ptr<FileWriteOptions> options) const override {
- return std::make_shared<DiscardingRowCountingFileWriter>(row_count_);
- }
- std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return NULLPTR; }
-
- private:
- std::shared_ptr<std::atomic<int>> row_count_;
- };
-
- protected:
- int old_capacity_ = 0;
- std::shared_ptr<Schema> schema_;
- std::shared_ptr<ScanOptions> options_;
-};
-
} // namespace dataset
} // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index e45f598..4650e80 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -25,7 +25,6 @@
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
-#include "arrow/util/future.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
@@ -208,14 +207,6 @@
/// \return Status
virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
- // Fallback to sync implementation until all other readers are converted(ARROW-11770)
- // and then this could become pure virtual with ReadNext falling back to async impl.
- virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() {
- std::shared_ptr<RecordBatch> batch;
- ARROW_RETURN_NOT_OK(ReadNext(&batch));
- return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(batch));
- }
-
/// \brief Iterator interface
Result<std::shared_ptr<RecordBatch>> Next() {
std::shared_ptr<RecordBatch> batch;
diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index 21754ec..4c8de91 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -718,7 +718,7 @@
return true;
}
if (control_res->has_value()) {
- break_fut.MarkFinished(*std::move(*control_res));
+ break_fut.MarkFinished(**control_res);
return true;
}
return false;
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index c388680..fc7dc85 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -106,9 +106,8 @@
Future<T> Transfer(Future<T> future) {
auto transferred = Future<T>::Make();
auto callback = [this, transferred](const Result<T>& result) mutable {
- auto spawn_status = Spawn([transferred, result]() mutable {
- transferred.MarkFinished(std::move(result));
- });
+ auto spawn_status =
+ Spawn([transferred, result]() mutable { transferred.MarkFinished(result); });
if (!spawn_status.ok()) {
transferred.MarkFinished(spawn_status);
}
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index 192b4b4..9943292 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -354,7 +354,6 @@
})
test_that("CSV dataset", {
- skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-12181
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
expect_r6_class(ds$format, "CsvFileFormat")
expect_r6_class(ds$filesystem, "LocalFileSystem")