PARQUET-1200: Support reading a single Arrow column from a Parquet file
cc @lorenzhs
Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>
Closes #434 from xhochy/PARQUET-1200 and squashes the following commits:
98ac6f0 [Korn, Uwe] PARQUET-1200: Support reading a single Arrow column from a Parquet file
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index db12fb4..369eb2e 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1496,7 +1496,7 @@
std::shared_ptr<Table> r1, r2;
// Read everything
ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
- ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2));
+ ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
std::shared_ptr<Table> concatenated;
ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index b33eda1..771b996 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -62,8 +62,8 @@
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
- EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
- << " != " << rhs->ToString();
+ EXPECT_TRUE(lhs->Equals(rhs))
+ << i << " " << lhs->ToString() << " != " << rhs->ToString();
}
}
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 9318305..7f81771 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -170,6 +170,8 @@
int16_t def_level,
std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
Status ReadColumn(int i, std::shared_ptr<Array>* out);
+ Status ReadColumnChunk(int column_index, int row_group_index,
+ std::shared_ptr<Array>* out);
Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
Status GetSchema(const std::vector<int>& indices,
std::shared_ptr<::arrow::Schema>* out);
@@ -391,6 +393,24 @@
return FromParquetSchema(descr, indices, parquet_key_value_metadata, out);
}
+Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
+ std::shared_ptr<Array>* out) {
+ auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
+ int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
+
+ std::unique_ptr<FileColumnIterator> input(
+ new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
+
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+ new PrimitiveImpl(pool_, std::move(input)));
+ ColumnReader flat_column_reader(std::move(impl));
+
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
+ *out = array;
+ return Status::OK();
+}
+
Status FileReader::Impl::ReadRowGroup(int row_group_index,
const std::vector<int>& indices,
std::shared_ptr<::arrow::Table>* out) {
@@ -408,17 +428,9 @@
auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
this](int i) {
int column_index = indices[i];
- int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
-
- std::unique_ptr<FileColumnIterator> input(
- new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
-
- std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
- new PrimitiveImpl(pool_, std::move(input)));
- ColumnReader flat_column_reader(std::move(impl));
std::shared_ptr<Array> array;
- RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
+ RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array));
columns[i] = std::make_shared<Column>(schema->field(i), array);
return Status::OK();
};
@@ -561,6 +573,11 @@
}
}
+std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
+ return std::shared_ptr<RowGroupReader>(
+ new RowGroupReader(impl_.get(), row_group_index));
+}
+
int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); }
@@ -1354,5 +1371,34 @@
return Status::OK();
}
+std::shared_ptr<ColumnChunkReader> RowGroupReader::Column(int column_index) {
+ return std::shared_ptr<ColumnChunkReader>(
+ new ColumnChunkReader(impl_, row_group_index_, column_index));
+}
+
+Status RowGroupReader::ReadTable(const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out) {
+ return impl_->ReadRowGroup(row_group_index_, column_indices, out);
+}
+
+Status RowGroupReader::ReadTable(std::shared_ptr<::arrow::Table>* out) {
+ return impl_->ReadRowGroup(row_group_index_, out);
+}
+
+RowGroupReader::~RowGroupReader() {}
+
+RowGroupReader::RowGroupReader(FileReader::Impl* impl, int row_group_index)
+ : impl_(impl), row_group_index_(row_group_index) {}
+
+Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
+ return impl_->ReadColumnChunk(column_index_, row_group_index_, out);
+}
+
+ColumnChunkReader::~ColumnChunkReader() {}
+
+ColumnChunkReader::ColumnChunkReader(FileReader::Impl* impl, int row_group_index,
+ int column_index)
+ : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
+
} // namespace arrow
} // namespace parquet
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index faaef9a..95b2186 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -39,11 +39,27 @@
namespace arrow {
+class ColumnChunkReader;
class ColumnReader;
+class RowGroupReader;
// Arrow read adapter class for deserializing Parquet files as Arrow row
// batches.
//
+// This interfaces caters for different use cases and thus provides different
+// interfaces. In its most simplistic form, we cater for a user that wants to
+// read the whole Parquet at once with the FileReader::ReadTable method.
+//
+// More advanced users that also want to implement parallelism on top of each
+// single Parquet files should do this on the RowGroup level. For this, they can
+// call FileReader::RowGroup(i)->ReadTable to receive only the specified
+// RowGroup as a table.
+//
+// In the most advanced situation, where a consumer wants to independently read
+// RowGroups in parallel and consume each column individually, they can call
+// FileReader::RowGroup(i)->Column(j)->Read and receive an arrow::Column
+// instance.
+//
// TODO(wesm): nested data does not always make sense with this user
// interface unless you are only reading a single leaf node from a branch of
// a table. For example:
@@ -150,6 +166,10 @@
::arrow::Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
int64_t* num_rows);
+ /// \brief Return a reader for the RowGroup, this object must not outlive the
+ /// FileReader.
+ std::shared_ptr<RowGroupReader> RowGroup(int row_group_index);
+
int num_row_groups() const;
const ParquetFileReader* parquet_reader() const;
@@ -161,10 +181,46 @@
virtual ~FileReader();
private:
+ friend ColumnChunkReader;
+ friend RowGroupReader;
+
class PARQUET_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
};
+class PARQUET_EXPORT RowGroupReader {
+ public:
+ std::shared_ptr<ColumnChunkReader> Column(int column_index);
+
+ ::arrow::Status ReadTable(const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out);
+ ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
+
+ virtual ~RowGroupReader();
+
+ private:
+ friend FileReader;
+ RowGroupReader(FileReader::Impl* reader, int row_group_index);
+
+ FileReader::Impl* impl_;
+ int row_group_index_;
+};
+
+class PARQUET_EXPORT ColumnChunkReader {
+ public:
+ ::arrow::Status Read(std::shared_ptr<::arrow::Array>* out);
+
+ virtual ~ColumnChunkReader();
+
+ private:
+ friend RowGroupReader;
+ ColumnChunkReader(FileReader::Impl* impl, int row_group_index, int column_index);
+
+ FileReader::Impl* impl_;
+ int column_index_;
+ int row_group_index_;
+};
+
// At this point, the column reader is a stream iterator. It only knows how to
// read the next batch of values for a particular column from the file until it
// runs out.
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index 72c71c6..7b74812 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -64,9 +64,9 @@
: contents_(std::move(contents)) {}
std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
- DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
- << metadata()->num_columns()
- << "columns, requested column: " << i;
+ DCHECK(i < metadata()->num_columns())
+ << "The RowGroup only has " << metadata()->num_columns()
+ << "columns, requested column: " << i;
const ColumnDescriptor* descr = metadata()->schema()->Column(i);
std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -76,9 +76,9 @@
}
std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
- DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
- << metadata()->num_columns()
- << "columns, requested column: " << i;
+ DCHECK(i < metadata()->num_columns())
+ << "The RowGroup only has " << metadata()->num_columns()
+ << "columns, requested column: " << i;
return contents_->GetColumnPageReader(i);
}
@@ -302,9 +302,9 @@
}
std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
- DCHECK(i < metadata()->num_row_groups()) << "The file only has "
- << metadata()->num_row_groups()
- << "row groups, requested reader for: " << i;
+ DCHECK(i < metadata()->num_row_groups())
+ << "The file only has " << metadata()->num_row_groups()
+ << "row groups, requested reader for: " << i;
return contents_->GetRowGroup(i);
}