PARQUET-1392: Read multiple RowGroups at once into an Arrow table
Decided to go with the more simplistic approach and only introduce a convenience API for now. Once is merged, I'll do some work that at least primitive arrays are read into a single Array in this path.
Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>
Closes #492 from xhochy/PARQUET-1392 and squashes the following commits:
e0ad006 [Korn, Uwe] Preallocate tables vector
94c7246 [Korn, Uwe] Read multiple RowGroups at once into an Arrow table
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index 51eb0c2..41cb88d 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -195,6 +195,69 @@
BENCHMARK_TEMPLATE2(BM_ReadColumn, false, BooleanType);
BENCHMARK_TEMPLATE2(BM_ReadColumn, true, BooleanType);
+static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
+ std::vector<int64_t> values(BENCHMARK_SIZE, 128);
+ std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
+ auto output = std::make_shared<InMemoryOutputStream>();
+ // This writes 10 RowGroups
+ EXIT_NOT_OK(
+ WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
+ std::shared_ptr<Buffer> buffer = output->GetBuffer();
+
+ while (state.KeepRunning()) {
+ auto reader =
+ ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+ FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+
+ std::vector<std::shared_ptr<::arrow::Table>> tables;
+ for (int i = 0; i < filereader.num_row_groups(); i++) {
+ // Only read the even numbered RowGroups
+ if ((i % 2) == 0) {
+ std::shared_ptr<::arrow::Table> table;
+ EXIT_NOT_OK(filereader.RowGroup(i)->ReadTable(&table));
+ tables.push_back(table);
+ }
+ }
+
+ std::shared_ptr<::arrow::Table> final_table;
+ EXIT_NOT_OK(ConcatenateTables(tables, &final_table));
+ }
+ SetBytesProcessed<true, Int64Type>(state);
+}
+
+BENCHMARK(BM_ReadIndividualRowGroups);
+
+static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
+ std::vector<int64_t> values(BENCHMARK_SIZE, 128);
+ std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
+ auto output = std::make_shared<InMemoryOutputStream>();
+ // This writes 10 RowGroups
+ EXIT_NOT_OK(
+ WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
+ std::shared_ptr<Buffer> buffer = output->GetBuffer();
+
+ while (state.KeepRunning()) {
+ auto reader =
+ ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+ FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+
+ std::vector<std::shared_ptr<::arrow::Table>> tables;
+ std::vector<int> rgs;
+ for (int i = 0; i < filereader.num_row_groups(); i++) {
+ // Only read the even numbered RowGroups
+ if ((i % 2) == 0) {
+ rgs.push_back(i);
+ }
+ }
+
+ std::shared_ptr<::arrow::Table> table;
+ EXIT_NOT_OK(filereader.ReadRowGroups(rgs, &table));
+ }
+ SetBytesProcessed<true, Int64Type>(state);
+}
+
+BENCHMARK(BM_ReadMultipleRowGroups);
+
} // namespace benchmark
} // namespace parquet
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index ad78c22..5f4e123 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1614,14 +1614,21 @@
ASSERT_EQ(2, reader->num_row_groups());
- std::shared_ptr<Table> r1, r2;
+ std::shared_ptr<Table> r1, r2, r3, r4;
// Read everything
ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
+ ASSERT_OK_NO_THROW(reader->ReadRowGroups({0, 1}, &r3));
+ ASSERT_OK_NO_THROW(reader->ReadRowGroups({1}, &r4));
std::shared_ptr<Table> concatenated;
- ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
+ ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
+ ASSERT_TRUE(table->Equals(*concatenated));
+
+ ASSERT_TRUE(table->Equals(*r3));
+ ASSERT_TRUE(r2->Equals(*r4));
+ ASSERT_OK(ConcatenateTables({r1, r4}, &concatenated));
ASSERT_TRUE(table->Equals(*concatenated));
}
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index d0b397f..2e4dc81 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -228,11 +228,15 @@
Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
Status GetSchema(const std::vector<int>& indices,
std::shared_ptr<::arrow::Schema>* out);
+ Status ReadRowGroup(int row_group_index, std::shared_ptr<Table>* table);
Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
std::shared_ptr<::arrow::Table>* out);
Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
Status ReadTable(std::shared_ptr<Table>* table);
- Status ReadRowGroup(int i, std::shared_ptr<Table>* table);
+ Status ReadRowGroups(const std::vector<int>& row_groups, std::shared_ptr<Table>* table);
+ Status ReadRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& indices,
+ std::shared_ptr<::arrow::Table>* out);
bool CheckForFlatColumn(const ColumnDescriptor* descr);
bool CheckForFlatListColumn(const ColumnDescriptor* descr);
@@ -562,6 +566,29 @@
return ReadTable(indices, table);
}
+Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& indices,
+ std::shared_ptr<Table>* table) {
+ // TODO(PARQUET-1393): Modify the record readers to already read this into a single,
+ // continuous array.
+ std::vector<std::shared_ptr<Table>> tables(row_groups.size(), nullptr);
+
+ for (size_t i = 0; i < row_groups.size(); ++i) {
+ RETURN_NOT_OK(ReadRowGroup(row_groups[i], indices, &tables[i]));
+ }
+ return ConcatenateTables(tables, table);
+}
+
+Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
+ std::shared_ptr<Table>* table) {
+ std::vector<int> indices(reader_->metadata()->num_columns());
+
+ for (size_t i = 0; i < indices.size(); ++i) {
+ indices[i] = static_cast<int>(i);
+ }
+ return ReadRowGroups(row_groups, indices, table);
+}
+
Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
std::vector<int> indices(reader_->metadata()->num_columns());
@@ -683,6 +710,25 @@
}
}
+Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
+ std::shared_ptr<Table>* out) {
+ try {
+ return impl_->ReadRowGroups(row_groups, out);
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+}
+
+Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& indices,
+ std::shared_ptr<Table>* out) {
+ try {
+ return impl_->ReadRowGroups(row_groups, indices, out);
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+}
+
std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
return std::shared_ptr<RowGroupReader>(
new RowGroupReader(impl_.get(), row_group_index));
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 1e37d89..db135da 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -182,6 +182,13 @@
::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);
+ ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out);
+
+ ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
+ std::shared_ptr<::arrow::Table>* out);
+
/// \brief Scan file contents with one thread, return number of rows
::arrow::Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
int64_t* num_rows);