PARQUET-1392: Read multiple RowGroups at once into an Arrow table

Decided to go with the more simplistic approach and only introduce a convenience API for now. Once is merged, I'll do some work that at least primitive arrays are read into a single Array in this path.

Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes #492 from xhochy/PARQUET-1392 and squashes the following commits:

e0ad006 [Korn, Uwe] Preallocate tables vector
94c7246 [Korn, Uwe] Read multiple RowGroups at once into an Arrow table
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index 51eb0c2..41cb88d 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -195,6 +195,69 @@
 BENCHMARK_TEMPLATE2(BM_ReadColumn, false, BooleanType);
 BENCHMARK_TEMPLATE2(BM_ReadColumn, true, BooleanType);
 
+static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
+  std::vector<int64_t> values(BENCHMARK_SIZE, 128);
+  std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
+  auto output = std::make_shared<InMemoryOutputStream>();
+  // This writes 10 RowGroups
+  EXIT_NOT_OK(
+      WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
+  std::shared_ptr<Buffer> buffer = output->GetBuffer();
+
+  while (state.KeepRunning()) {
+    auto reader =
+        ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+    FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+
+    std::vector<std::shared_ptr<::arrow::Table>> tables;
+    for (int i = 0; i < filereader.num_row_groups(); i++) {
+      // Only read the even numbered RowGroups
+      if ((i % 2) == 0) {
+        std::shared_ptr<::arrow::Table> table;
+        EXIT_NOT_OK(filereader.RowGroup(i)->ReadTable(&table));
+        tables.push_back(table);
+      }
+    }
+
+    std::shared_ptr<::arrow::Table> final_table;
+    EXIT_NOT_OK(ConcatenateTables(tables, &final_table));
+  }
+  SetBytesProcessed<true, Int64Type>(state);
+}
+
+BENCHMARK(BM_ReadIndividualRowGroups);
+
+static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
+  std::vector<int64_t> values(BENCHMARK_SIZE, 128);
+  std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
+  auto output = std::make_shared<InMemoryOutputStream>();
+  // This writes 10 RowGroups
+  EXIT_NOT_OK(
+      WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
+  std::shared_ptr<Buffer> buffer = output->GetBuffer();
+
+  while (state.KeepRunning()) {
+    auto reader =
+        ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+    FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+
+    std::vector<std::shared_ptr<::arrow::Table>> tables;
+    std::vector<int> rgs;
+    for (int i = 0; i < filereader.num_row_groups(); i++) {
+      // Only read the even numbered RowGroups
+      if ((i % 2) == 0) {
+        rgs.push_back(i);
+      }
+    }
+
+    std::shared_ptr<::arrow::Table> table;
+    EXIT_NOT_OK(filereader.ReadRowGroups(rgs, &table));
+  }
+  SetBytesProcessed<true, Int64Type>(state);
+}
+
+BENCHMARK(BM_ReadMultipleRowGroups);
+
 }  // namespace benchmark
 
 }  // namespace parquet
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index ad78c22..5f4e123 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1614,14 +1614,21 @@
 
   ASSERT_EQ(2, reader->num_row_groups());
 
-  std::shared_ptr<Table> r1, r2;
+  std::shared_ptr<Table> r1, r2, r3, r4;
   // Read everything
   ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
   ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
+  ASSERT_OK_NO_THROW(reader->ReadRowGroups({0, 1}, &r3));
+  ASSERT_OK_NO_THROW(reader->ReadRowGroups({1}, &r4));
 
   std::shared_ptr<Table> concatenated;
-  ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
 
+  ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
+  ASSERT_TRUE(table->Equals(*concatenated));
+
+  ASSERT_TRUE(table->Equals(*r3));
+  ASSERT_TRUE(r2->Equals(*r4));
+  ASSERT_OK(ConcatenateTables({r1, r4}, &concatenated));
   ASSERT_TRUE(table->Equals(*concatenated));
 }
 
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index d0b397f..2e4dc81 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -228,11 +228,15 @@
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(const std::vector<int>& indices,
                    std::shared_ptr<::arrow::Schema>* out);
+  Status ReadRowGroup(int row_group_index, std::shared_ptr<Table>* table);
   Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
                       std::shared_ptr<::arrow::Table>* out);
   Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
   Status ReadTable(std::shared_ptr<Table>* table);
-  Status ReadRowGroup(int i, std::shared_ptr<Table>* table);
+  Status ReadRowGroups(const std::vector<int>& row_groups, std::shared_ptr<Table>* table);
+  Status ReadRowGroups(const std::vector<int>& row_groups,
+                       const std::vector<int>& indices,
+                       std::shared_ptr<::arrow::Table>* out);
 
   bool CheckForFlatColumn(const ColumnDescriptor* descr);
   bool CheckForFlatListColumn(const ColumnDescriptor* descr);
@@ -562,6 +566,29 @@
   return ReadTable(indices, table);
 }
 
+Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
+                                       const std::vector<int>& indices,
+                                       std::shared_ptr<Table>* table) {
+  // TODO(PARQUET-1393): Modify the record readers to already read this into a single,
+  // continuous array.
+  std::vector<std::shared_ptr<Table>> tables(row_groups.size(), nullptr);
+
+  for (size_t i = 0; i < row_groups.size(); ++i) {
+    RETURN_NOT_OK(ReadRowGroup(row_groups[i], indices, &tables[i]));
+  }
+  return ConcatenateTables(tables, table);
+}
+
+Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
+                                       std::shared_ptr<Table>* table) {
+  std::vector<int> indices(reader_->metadata()->num_columns());
+
+  for (size_t i = 0; i < indices.size(); ++i) {
+    indices[i] = static_cast<int>(i);
+  }
+  return ReadRowGroups(row_groups, indices, table);
+}
+
 Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
   std::vector<int> indices(reader_->metadata()->num_columns());
 
@@ -683,6 +710,25 @@
   }
 }
 
+Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
+                                 std::shared_ptr<Table>* out) {
+  try {
+    return impl_->ReadRowGroups(row_groups, out);
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+}
+
+Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
+                                 const std::vector<int>& indices,
+                                 std::shared_ptr<Table>* out) {
+  try {
+    return impl_->ReadRowGroups(row_groups, indices, out);
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+}
+
 std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
   return std::shared_ptr<RowGroupReader>(
       new RowGroupReader(impl_.get(), row_group_index));
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 1e37d89..db135da 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -182,6 +182,13 @@
 
   ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);
 
+  ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
+                                const std::vector<int>& column_indices,
+                                std::shared_ptr<::arrow::Table>* out);
+
+  ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
+                                std::shared_ptr<::arrow::Table>* out);
+
   /// \brief Scan file contents with one thread, return number of rows
   ::arrow::Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
                                int64_t* num_rows);