PARQUET-1200: Support reading a single Arrow column from a Parquet file

cc @lorenzhs

Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes #434 from xhochy/PARQUET-1200 and squashes the following commits:

98ac6f0 [Korn, Uwe] PARQUET-1200: Support reading a single Arrow column from a Parquet file
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index db12fb4..369eb2e 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1496,7 +1496,7 @@
   std::shared_ptr<Table> r1, r2;
   // Read everything
   ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
-  ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2));
+  ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
 
   std::shared_ptr<Table> concatenated;
   ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index b33eda1..771b996 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -62,8 +62,8 @@
     for (int i = 0; i < expected_schema->num_fields(); ++i) {
       auto lhs = result_schema_->field(i);
       auto rhs = expected_schema->field(i);
-      EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
-                                    << " != " << rhs->ToString();
+      EXPECT_TRUE(lhs->Equals(rhs))
+          << i << " " << lhs->ToString() << " != " << rhs->ToString();
     }
   }
 
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 9318305..7f81771 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -170,6 +170,8 @@
                           int16_t def_level,
                           std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
   Status ReadColumn(int i, std::shared_ptr<Array>* out);
+  Status ReadColumnChunk(int column_index, int row_group_index,
+                         std::shared_ptr<Array>* out);
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(const std::vector<int>& indices,
                    std::shared_ptr<::arrow::Schema>* out);
@@ -391,6 +393,24 @@
   return FromParquetSchema(descr, indices, parquet_key_value_metadata, out);
 }
 
+Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
+                                         std::shared_ptr<Array>* out) {
+  auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
+  int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
+
+  std::unique_ptr<FileColumnIterator> input(
+      new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
+
+  std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+      new PrimitiveImpl(pool_, std::move(input)));
+  ColumnReader flat_column_reader(std::move(impl));
+
+  std::shared_ptr<Array> array;
+  RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
+  *out = array;
+  return Status::OK();
+}
+
 Status FileReader::Impl::ReadRowGroup(int row_group_index,
                                       const std::vector<int>& indices,
                                       std::shared_ptr<::arrow::Table>* out) {
@@ -408,17 +428,9 @@
   auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
                          this](int i) {
     int column_index = indices[i];
-    int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
-
-    std::unique_ptr<FileColumnIterator> input(
-        new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
-
-    std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
-        new PrimitiveImpl(pool_, std::move(input)));
-    ColumnReader flat_column_reader(std::move(impl));
 
     std::shared_ptr<Array> array;
-    RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
+    RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
   };
@@ -561,6 +573,11 @@
   }
 }
 
+std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
+  return std::shared_ptr<RowGroupReader>(
+      new RowGroupReader(impl_.get(), row_group_index));
+}
+
 int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
 
 void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); }
@@ -1354,5 +1371,34 @@
   return Status::OK();
 }
 
+std::shared_ptr<ColumnChunkReader> RowGroupReader::Column(int column_index) {
+  return std::shared_ptr<ColumnChunkReader>(
+      new ColumnChunkReader(impl_, row_group_index_, column_index));
+}
+
+Status RowGroupReader::ReadTable(const std::vector<int>& column_indices,
+                                 std::shared_ptr<::arrow::Table>* out) {
+  return impl_->ReadRowGroup(row_group_index_, column_indices, out);
+}
+
+Status RowGroupReader::ReadTable(std::shared_ptr<::arrow::Table>* out) {
+  return impl_->ReadRowGroup(row_group_index_, out);
+}
+
+RowGroupReader::~RowGroupReader() {}
+
+RowGroupReader::RowGroupReader(FileReader::Impl* impl, int row_group_index)
+    : impl_(impl), row_group_index_(row_group_index) {}
+
+Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
+  return impl_->ReadColumnChunk(column_index_, row_group_index_, out);
+}
+
+ColumnChunkReader::~ColumnChunkReader() {}
+
+ColumnChunkReader::ColumnChunkReader(FileReader::Impl* impl, int row_group_index,
+                                     int column_index)
+    : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
+
 }  // namespace arrow
 }  // namespace parquet
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index faaef9a..95b2186 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -39,11 +39,27 @@
 
 namespace arrow {
 
+class ColumnChunkReader;
 class ColumnReader;
+class RowGroupReader;
 
 // Arrow read adapter class for deserializing Parquet files as Arrow row
 // batches.
 //
+// This interfaces caters for different use cases and thus provides different
+// interfaces. In its most simplistic form, we cater for a user that wants to
+// read the whole Parquet at once with the FileReader::ReadTable method.
+//
+// More advanced users that also want to implement parallelism on top of each
+// single Parquet files should do this on the RowGroup level. For this, they can
+// call FileReader::RowGroup(i)->ReadTable to receive only the specified
+// RowGroup as a table.
+//
+// In the most advanced situation, where a consumer wants to independently read
+// RowGroups in parallel and consume each column individually, they can call
+// FileReader::RowGroup(i)->Column(j)->Read and receive an arrow::Column
+// instance.
+//
 // TODO(wesm): nested data does not always make sense with this user
 // interface unless you are only reading a single leaf node from a branch of
 // a table. For example:
@@ -150,6 +166,10 @@
   ::arrow::Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
                                int64_t* num_rows);
 
+  /// \brief Return a reader for the RowGroup, this object must not outlive the
+  ///   FileReader.
+  std::shared_ptr<RowGroupReader> RowGroup(int row_group_index);
+
   int num_row_groups() const;
 
   const ParquetFileReader* parquet_reader() const;
@@ -161,10 +181,46 @@
   virtual ~FileReader();
 
  private:
+  friend ColumnChunkReader;
+  friend RowGroupReader;
+
   class PARQUET_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
 };
 
+class PARQUET_EXPORT RowGroupReader {
+ public:
+  std::shared_ptr<ColumnChunkReader> Column(int column_index);
+
+  ::arrow::Status ReadTable(const std::vector<int>& column_indices,
+                            std::shared_ptr<::arrow::Table>* out);
+  ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
+
+  virtual ~RowGroupReader();
+
+ private:
+  friend FileReader;
+  RowGroupReader(FileReader::Impl* reader, int row_group_index);
+
+  FileReader::Impl* impl_;
+  int row_group_index_;
+};
+
+class PARQUET_EXPORT ColumnChunkReader {
+ public:
+  ::arrow::Status Read(std::shared_ptr<::arrow::Array>* out);
+
+  virtual ~ColumnChunkReader();
+
+ private:
+  friend RowGroupReader;
+  ColumnChunkReader(FileReader::Impl* impl, int row_group_index, int column_index);
+
+  FileReader::Impl* impl_;
+  int column_index_;
+  int row_group_index_;
+};
+
 // At this point, the column reader is a stream iterator. It only knows how to
 // read the next batch of values for a particular column from the file until it
 // runs out.
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index 72c71c6..7b74812 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -64,9 +64,9 @@
     : contents_(std::move(contents)) {}
 
 std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns())
+      << "The RowGroup only has " << metadata()->num_columns()
+      << "columns, requested column: " << i;
   const ColumnDescriptor* descr = metadata()->schema()->Column(i);
 
   std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -76,9 +76,9 @@
 }
 
 std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns())
+      << "The RowGroup only has " << metadata()->num_columns()
+      << "columns, requested column: " << i;
   return contents_->GetColumnPageReader(i);
 }
 
@@ -302,9 +302,9 @@
 }
 
 std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
-  DCHECK(i < metadata()->num_row_groups()) << "The file only has "
-                                           << metadata()->num_row_groups()
-                                           << "row groups, requested reader for: " << i;
+  DCHECK(i < metadata()->num_row_groups())
+      << "The file only has " << metadata()->num_row_groups()
+      << "row groups, requested reader for: " << i;
   return contents_->GetRowGroup(i);
 }