PARQUET-946: Add ReadRowGroup and num_row_group methods to arrow::FileReader
There's a lot of room for improvement / further refactoring here -- the assumption that an entire column in a file is being read runs very deep in the Arrow reader, so I tried to do the minimum work to decouple the row group iteration. There's some code duplication in ReadRowGroup, but we should maybe save further cleanup for a future patch.
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #291 from wesm/PARQUET-946 and squashes the following commits:
6d2b48a [Wes McKinney] Add virtual dtor
c7589f7 [Wes McKinney] Add ReadRowGroup and num_row_group methods to arrow::FileReader
diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake
index c07c7d2..e4ee984 100644
--- a/cmake_modules/FindClangTools.cmake
+++ b/cmake_modules/FindClangTools.cmake
@@ -27,16 +27,16 @@
# This module defines
# CLANG_TIDY_BIN, The path to the clang tidy binary
# CLANG_TIDY_FOUND, Whether clang tidy was found
-# CLANG_FORMAT_BIN, The path to the clang format binary
+# CLANG_FORMAT_BIN, The path to the clang format binary
# CLANG_TIDY_FOUND, Whether clang format was found
-find_program(CLANG_TIDY_BIN
- NAMES clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy
- PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
+find_program(CLANG_TIDY_BIN
+ NAMES clang-tidy-3.9 clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy
+ PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
NO_DEFAULT_PATH
)
-if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" )
+if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" )
set(CLANG_TIDY_FOUND 0)
message("clang-tidy not found")
else()
@@ -44,17 +44,16 @@
message("clang-tidy found at ${CLANG_TIDY_BIN}")
endif()
-find_program(CLANG_FORMAT_BIN
- NAMES clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format
- PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
+find_program(CLANG_FORMAT_BIN
+ NAMES clang-format-3.9 clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format
+ PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
NO_DEFAULT_PATH
)
-if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" )
+if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" )
set(CLANG_FORMAT_FOUND 0)
message("clang-format not found")
else()
set(CLANG_FORMAT_FOUND 1)
message("clang-format found at ${CLANG_FORMAT_BIN}")
endif()
-
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 3b232f9..dd46893 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -197,6 +197,36 @@
template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
+void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
+ int64_t row_group_size, std::shared_ptr<Buffer>* out) {
+ auto sink = std::make_shared<InMemoryOutputStream>();
+
+ ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
+ row_group_size, default_writer_properties()));
+ *out = sink->GetBuffer();
+}
+
+void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+ int64_t row_group_size, const std::vector<int>& column_subset,
+ std::shared_ptr<Table>* out) {
+ std::shared_ptr<Buffer> buffer;
+ WriteTableToBuffer(table, num_threads, row_group_size, &buffer);
+
+ std::unique_ptr<FileReader> reader;
+ ASSERT_OK_NO_THROW(
+ OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ reader->set_num_threads(num_threads);
+
+ if (column_subset.size() > 0) {
+ ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
+ } else {
+ // Read everything
+ ASSERT_OK_NO_THROW(reader->ReadTable(out));
+ }
+}
+
template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
@@ -248,19 +278,6 @@
ASSERT_NE(nullptr, out->get());
}
- void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
- std::shared_ptr<::arrow::Table> out;
- std::unique_ptr<FileReader> reader;
- ReaderFromSink(&reader);
- ReadTableFromFile(std::move(reader), &out);
- ASSERT_EQ(1, out->num_columns());
- ASSERT_EQ(values->length(), out->num_rows());
-
- std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
- ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
- }
-
void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements,
int64_t null_count, std::shared_ptr<Table>* out) {
std::shared_ptr<Array> values;
@@ -289,13 +306,23 @@
*out = MakeSimpleTable(parent_lists, nullable_parent_lists);
}
- void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table) {
- std::shared_ptr<Array> values = table->column(0)->data()->chunk(0);
- this->sink_ = std::make_shared<InMemoryOutputStream>();
- ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
- values->length(), default_writer_properties()));
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+ std::shared_ptr<::arrow::Table> out;
+ std::unique_ptr<FileReader> reader;
+ ReaderFromSink(&reader);
+ ReadTableFromFile(std::move(reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(values->length(), out->num_rows());
- this->ReadAndCheckSingleColumnTable(values);
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+ }
+
+ void CheckRoundTrip(const std::shared_ptr<Table>& table) {
+ std::shared_ptr<Table> result;
+ DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+ ASSERT_TRUE(table->Equals(*result));
}
template <typename ArrayType>
@@ -401,37 +428,37 @@
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, true, true, 10, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, false, true, 10, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, true, false, 10, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, false, false, 0, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
@@ -756,18 +783,24 @@
this->CheckSingleColumnRequiredTableRead(4);
}
-void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) {
+void MakeDoubleTable(
+ int num_columns, int num_rows, int nchunks, std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Column> column;
std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);
- std::shared_ptr<Array> values;
for (int i = 0; i < num_columns; ++i) {
+ std::vector<std::shared_ptr<Array>> arrays;
+ std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<::arrow::DoubleType>(
num_rows, num_rows / 10, static_cast<uint32_t>(i), &values));
std::stringstream ss;
ss << "col" << i;
- column = MakeColumn(ss.str(), values, true);
+
+ for (int j = 0; j < nchunks; ++j) {
+ arrays.push_back(values);
+ }
+ column = MakeColumn(ss.str(), arrays, true);
columns[i] = column;
fields[i] = column->field();
@@ -776,54 +809,59 @@
*out = std::make_shared<Table>(schema, columns);
}
-void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
- const std::vector<int>& column_subset, std::shared_ptr<Table>* out) {
- auto sink = std::make_shared<InMemoryOutputStream>();
-
- ASSERT_OK_NO_THROW(WriteTable(
- *table, ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
-
- std::shared_ptr<Buffer> buffer = sink->GetBuffer();
- std::unique_ptr<FileReader> reader;
- ASSERT_OK_NO_THROW(
- OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
- ::parquet::default_reader_properties(), nullptr, &reader));
-
- reader->set_num_threads(num_threads);
-
- if (column_subset.size() > 0) {
- ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
- } else {
- // Read everything
- ASSERT_OK_NO_THROW(reader->ReadTable(out));
- }
-}
-
TEST(TestArrowReadWrite, MultithreadedRead) {
const int num_columns = 20;
const int num_rows = 1000;
const int num_threads = 4;
std::shared_ptr<Table> table;
- MakeDoubleTable(num_columns, num_rows, &table);
+ MakeDoubleTable(num_columns, num_rows, 1, &table);
std::shared_ptr<Table> result;
- DoTableRoundtrip(table, num_threads, {}, &result);
+ DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result);
ASSERT_TRUE(table->Equals(*result));
}
+TEST(TestArrowReadWrite, ReadSingleRowGroup) {
+ const int num_columns = 20;
+ const int num_rows = 1000;
+
+ std::shared_ptr<Table> table;
+ MakeDoubleTable(num_columns, num_rows, 1, &table);
+
+ std::shared_ptr<Buffer> buffer;
+ WriteTableToBuffer(table, 1, num_rows / 2, &buffer);
+
+ std::unique_ptr<FileReader> reader;
+ ASSERT_OK_NO_THROW(
+ OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ ASSERT_EQ(2, reader->num_row_groups());
+
+ std::shared_ptr<Table> r1, r2;
+ // Read everything
+ ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
+ ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2));
+
+ std::shared_ptr<Table> concatenated;
+ ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
+
+ ASSERT_TRUE(table->Equals(*concatenated));
+}
+
TEST(TestArrowReadWrite, ReadColumnSubset) {
const int num_columns = 20;
const int num_rows = 1000;
const int num_threads = 4;
std::shared_ptr<Table> table;
- MakeDoubleTable(num_columns, num_rows, &table);
+ MakeDoubleTable(num_columns, num_rows, 1, &table);
std::shared_ptr<Table> result;
std::vector<int> column_subset = {0, 4, 8, 10};
- DoTableRoundtrip(table, num_threads, column_subset, &result);
+ DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result);
std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index a26c3ea..823aea9 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -60,117 +60,8 @@
template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
-class FileReader::Impl {
- public:
- Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
- virtual ~Impl() {}
-
- bool CheckForFlatColumn(const ColumnDescriptor* descr);
- bool CheckForFlatListColumn(const ColumnDescriptor* descr);
- Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
- Status ReadColumn(int i, std::shared_ptr<Array>* out);
- Status ReadTable(std::shared_ptr<Table>* out);
- Status ReadTable(const std::vector<int>& column_indices, std::shared_ptr<Table>* out);
- const ParquetFileReader* parquet_reader() const { return reader_.get(); }
-
- void set_num_threads(int num_threads) { num_threads_ = num_threads; }
-
- private:
- MemoryPool* pool_;
- std::unique_ptr<ParquetFileReader> reader_;
-
- int num_threads_;
-};
-
-class ColumnReader::Impl {
- public:
- Impl(MemoryPool* pool, const ColumnDescriptor* descr, ParquetFileReader* reader,
- int column_index);
- virtual ~Impl() {}
-
- Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
-
- template <typename ArrowType, typename ParquetType>
- Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
-
- template <typename ArrowType>
- Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);
-
- template <typename ArrowType>
- Status InitDataBuffer(int batch_size);
- Status InitValidBits(int batch_size);
- template <typename ArrowType, typename ParquetType>
- Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels,
- int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read,
- int64_t* values_read);
- template <typename ArrowType, typename ParquetType>
- Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
- int64_t values_to_read, int64_t* levels_read);
- Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
- int64_t total_values_read, std::shared_ptr<Array>* array);
-
- private:
- void NextRowGroup();
-
- template <typename InType, typename OutType>
- struct can_copy_ptr {
- static constexpr bool value =
- std::is_same<InType, OutType>::value ||
- (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
- (sizeof(InType) == sizeof(OutType)));
- };
-
- MemoryPool* pool_;
- const ColumnDescriptor* descr_;
- ParquetFileReader* reader_;
- int column_index_;
- int next_row_group_;
- std::shared_ptr<::parquet::ColumnReader> column_reader_;
- std::shared_ptr<Field> field_;
-
- PoolBuffer values_buffer_;
- PoolBuffer def_levels_buffer_;
- PoolBuffer rep_levels_buffer_;
- std::shared_ptr<PoolBuffer> data_buffer_;
- uint8_t* data_buffer_ptr_;
- std::shared_ptr<PoolBuffer> valid_bits_buffer_;
- uint8_t* valid_bits_ptr_;
- int64_t valid_bits_idx_;
- int64_t null_count_;
-};
-
-FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
- : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
-
-Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
- const SchemaDescriptor* schema = reader_->metadata()->schema();
-
- std::unique_ptr<ColumnReader::Impl> impl(
- new ColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
- *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
- return Status::OK();
-}
-
-Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
- std::unique_ptr<ColumnReader> flat_column_reader;
- RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
-
- int64_t batch_size = 0;
- for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
- batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
- }
-
- return flat_column_reader->NextBatch(batch_size, out);
-}
-
-Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
- std::vector<int> column_indices(reader_->metadata()->num_columns());
-
- for (size_t i = 0; i < column_indices.size(); ++i) {
- column_indices[i] = i;
- }
- return ReadTable(column_indices, table);
-}
+// ----------------------------------------------------------------------
+// Helper for parallel for-loop
template <class FUNCTION>
Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
@@ -206,12 +97,255 @@
return Status::OK();
}
+// ----------------------------------------------------------------------
+// Iteration utilities
+
+// Abstraction to decouple row group iteration details from the ColumnReader,
+// so we can read only a single row group if we want
+class FileColumnIterator {
+ public:
+ explicit FileColumnIterator(int column_index, ParquetFileReader* reader)
+ : column_index_(column_index),
+ reader_(reader),
+ schema_(reader->metadata()->schema()) {}
+
+ virtual ~FileColumnIterator() {}
+
+ virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0;
+
+ const SchemaDescriptor* schema() const { return schema_; }
+
+ const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
+
+ int column_index() const { return column_index_; }
+
+ protected:
+ int column_index_;
+ ParquetFileReader* reader_;
+ const SchemaDescriptor* schema_;
+};
+
+class AllRowGroupsIterator : public FileColumnIterator {
+ public:
+ explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
+ : FileColumnIterator(column_index, reader), next_row_group_(0) {}
+
+ std::shared_ptr<::parquet::ColumnReader> Next() override {
+ std::shared_ptr<::parquet::ColumnReader> result;
+ if (next_row_group_ < reader_->metadata()->num_row_groups()) {
+ result = reader_->RowGroup(next_row_group_)->Column(column_index_);
+ next_row_group_++;
+ } else {
+ result = nullptr;
+ }
+ return result;
+ };
+
+ private:
+ int next_row_group_;
+};
+
+class SingleRowGroupIterator : public FileColumnIterator {
+ public:
+ explicit SingleRowGroupIterator(
+ int column_index, int row_group_number, ParquetFileReader* reader)
+ : FileColumnIterator(column_index, reader),
+ row_group_number_(row_group_number),
+ done_(false) {}
+
+ std::shared_ptr<::parquet::ColumnReader> Next() override {
+ if (done_) { return nullptr; }
+
+ auto result = reader_->RowGroup(row_group_number_)->Column(column_index_);
+ done_ = true;
+ return result;
+ };
+
+ private:
+ int row_group_number_;
+ bool done_;
+};
+
+// ----------------------------------------------------------------------
+// File reader implementation
+
+class FileReader::Impl {
+ public:
+ Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+ : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
+
+ virtual ~Impl() {}
+
+ Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
+ Status ReadColumn(int i, std::shared_ptr<Array>* out);
+ Status GetSchema(
+ const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out);
+ Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
+ std::shared_ptr<::arrow::Table>* out);
+ Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
+ Status ReadTable(std::shared_ptr<Table>* table);
+ Status ReadRowGroup(int i, std::shared_ptr<Table>* table);
+
+ bool CheckForFlatColumn(const ColumnDescriptor* descr);
+ bool CheckForFlatListColumn(const ColumnDescriptor* descr);
+
+ const ParquetFileReader* parquet_reader() const { return reader_.get(); }
+
+ int num_row_groups() const { return reader_->metadata()->num_row_groups(); }
+
+ void set_num_threads(int num_threads) { num_threads_ = num_threads; }
+
+ private:
+ MemoryPool* pool_;
+ std::unique_ptr<ParquetFileReader> reader_;
+
+ int num_threads_;
+};
+
+class ColumnReader::Impl {
+ public:
+ Impl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
+ : pool_(pool),
+ input_(std::move(input)),
+ descr_(input_->descr()),
+ values_buffer_(pool),
+ def_levels_buffer_(pool),
+ rep_levels_buffer_(pool) {
+ NodeToField(input_->descr()->schema_node(), &field_);
+ NextRowGroup();
+ }
+
+ virtual ~Impl() {}
+
+ Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
+
+ template <typename ArrowType, typename ParquetType>
+ Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
+
+ template <typename ArrowType>
+ Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);
+
+ template <typename ArrowType>
+ Status InitDataBuffer(int batch_size);
+ Status InitValidBits(int batch_size);
+ template <typename ArrowType, typename ParquetType>
+ Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels,
+ int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read,
+ int64_t* values_read);
+ template <typename ArrowType, typename ParquetType>
+ Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
+ int64_t values_to_read, int64_t* levels_read);
+ Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
+ int64_t total_values_read, std::shared_ptr<Array>* array);
+
+ private:
+ void NextRowGroup();
+
+ template <typename InType, typename OutType>
+ struct can_copy_ptr {
+ static constexpr bool value =
+ std::is_same<InType, OutType>::value ||
+ (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+ (sizeof(InType) == sizeof(OutType)));
+ };
+
+ MemoryPool* pool_;
+ std::unique_ptr<FileColumnIterator> input_;
+ const ColumnDescriptor* descr_;
+
+ std::shared_ptr<::parquet::ColumnReader> column_reader_;
+ std::shared_ptr<Field> field_;
+
+ PoolBuffer values_buffer_;
+ PoolBuffer def_levels_buffer_;
+ PoolBuffer rep_levels_buffer_;
+ std::shared_ptr<PoolBuffer> data_buffer_;
+ uint8_t* data_buffer_ptr_;
+ std::shared_ptr<PoolBuffer> valid_bits_buffer_;
+ uint8_t* valid_bits_ptr_;
+ int64_t valid_bits_idx_;
+ int64_t null_count_;
+};
+
+FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+ : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+
+FileReader::~FileReader() {}
+
+Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
+ std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
+
+ std::unique_ptr<ColumnReader::Impl> impl(
+ new ColumnReader::Impl(pool_, std::move(input)));
+ *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
+ return Status::OK();
+}
+
+Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
+ std::unique_ptr<ColumnReader> flat_column_reader;
+ RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
+
+ int64_t batch_size = 0;
+ for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
+ batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+ }
+
+ return flat_column_reader->NextBatch(batch_size, out);
+}
+
+Status FileReader::Impl::GetSchema(
+ const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out) {
+ auto descr = reader_->metadata()->schema();
+ return FromParquetSchema(descr, indices, out);
+}
+
+Status FileReader::Impl::ReadRowGroup(int row_group_index,
+ const std::vector<int>& indices, std::shared_ptr<::arrow::Table>* out) {
+ std::shared_ptr<::arrow::Schema> schema;
+ RETURN_NOT_OK(GetSchema(indices, &schema));
+
+ auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
+
+ int num_columns = static_cast<int>(indices.size());
+ int nthreads = std::min<int>(num_threads_, num_columns);
+ std::vector<std::shared_ptr<Column>> columns(num_columns);
+
+ // TODO(wesm): Refactor to share more code with ReadTable
+
+ auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
+ this](int i) {
+ int column_index = indices[i];
+ int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
+
+ std::unique_ptr<FileColumnIterator> input(
+ new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
+
+ std::unique_ptr<ColumnReader::Impl> impl(
+ new ColumnReader::Impl(pool_, std::move(input)));
+ ColumnReader flat_column_reader(std::move(impl));
+
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(flat_column_reader.NextBatch(batch_size, &array));
+ columns[i] = std::make_shared<Column>(schema->field(i), array);
+ return Status::OK();
+ };
+
+ if (nthreads == 1) {
+ for (int i = 0; i < num_columns; i++) {
+ RETURN_NOT_OK(ReadColumnFunc(i));
+ }
+ } else {
+ RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc));
+ }
+
+ *out = std::make_shared<Table>(schema, columns);
+ return Status::OK();
+}
+
Status FileReader::Impl::ReadTable(
const std::vector<int>& indices, std::shared_ptr<Table>* table) {
- auto descr = reader_->metadata()->schema();
-
std::shared_ptr<::arrow::Schema> schema;
- RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema));
+ RETURN_NOT_OK(GetSchema(indices, &schema));
int num_columns = static_cast<int>(indices.size());
int nthreads = std::min<int>(num_threads_, num_columns);
@@ -236,10 +370,23 @@
return Status::OK();
}
-FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
- : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
+ std::vector<int> indices(reader_->metadata()->num_columns());
-FileReader::~FileReader() {}
+ for (size_t i = 0; i < indices.size(); ++i) {
+ indices[i] = i;
+ }
+ return ReadTable(indices, table);
+}
+
+Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
+ std::vector<int> indices(reader_->metadata()->num_columns());
+
+ for (size_t i = 0; i < indices.size(); ++i) {
+ indices[i] = i;
+ }
+ return ReadRowGroup(i, indices, table);
+}
// Static ctor
Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
@@ -280,14 +427,35 @@
}
Status FileReader::ReadTable(
- const std::vector<int>& column_indices, std::shared_ptr<Table>* out) {
+ const std::vector<int>& indices, std::shared_ptr<Table>* out) {
try {
- return impl_->ReadTable(column_indices, out);
+ return impl_->ReadTable(indices, out);
} catch (const ::parquet::ParquetException& e) {
return ::arrow::Status::IOError(e.what());
}
}
+Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) {
+ try {
+ return impl_->ReadRowGroup(i, out);
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+}
+
+Status FileReader::ReadRowGroup(
+ int i, const std::vector<int>& indices, std::shared_ptr<Table>* out) {
+ try {
+ return impl_->ReadRowGroup(i, indices, out);
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+}
+
+int FileReader::num_row_groups() const {
+ return impl_->num_row_groups();
+}
+
void FileReader::set_num_threads(int num_threads) {
impl_->set_num_threads(num_threads);
}
@@ -296,20 +464,6 @@
return impl_->parquet_reader();
}
-ColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
- ParquetFileReader* reader, int column_index)
- : pool_(pool),
- descr_(descr),
- reader_(reader),
- column_index_(column_index),
- next_row_group_(0),
- values_buffer_(pool),
- def_levels_buffer_(pool),
- rep_levels_buffer_(pool) {
- NodeToField(descr_->schema_node(), &field_);
- NextRowGroup();
-}
-
template <typename ArrowType, typename ParquetType>
Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
int64_t values_to_read, int64_t* levels_read) {
@@ -563,7 +717,7 @@
if (descr_->max_repetition_level() > 0) {
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(
- FromParquetSchema(reader_->metadata()->schema(), {column_index_}, &arrow_schema));
+ FromParquetSchema(input_->schema(), {input_->column_index()}, &arrow_schema));
// Walk downwards to extract nullability
std::shared_ptr<Field> current_field = arrow_schema->field(0);
@@ -912,12 +1066,7 @@
}
void ColumnReader::Impl::NextRowGroup() {
- if (next_row_group_ < reader_->metadata()->num_row_groups()) {
- column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
- next_row_group_++;
- } else {
- column_reader_ = nullptr;
- }
+ column_reader_ = input_->Next();
}
ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 1aa9c3e..f12acaf 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -107,6 +107,13 @@
::arrow::Status ReadTable(
const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out);
+ ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out);
+
+ ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);
+
+ int num_row_groups() const;
+
const ParquetFileReader* parquet_reader() const;
/// Set the number of threads to use during reads of multiple columns. By
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 2cfc60a..bff952b 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -260,12 +260,18 @@
return Status::OK();
}
-std::shared_ptr<::arrow::Column> MakeColumn(
+static std::shared_ptr<::arrow::Column> MakeColumn(
const std::string& name, const std::shared_ptr<Array>& array, bool nullable) {
auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable);
return std::make_shared<::arrow::Column>(field, array);
}
+static std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name,
+ const std::vector<std::shared_ptr<Array>>& arrays, bool nullable) {
+ auto field = std::make_shared<::arrow::Field>(name, arrays[0]->type(), nullable);
+ return std::make_shared<::arrow::Column>(field, arrays);
+}
+
std::shared_ptr<::arrow::Table> MakeSimpleTable(
const std::shared_ptr<Array>& values, bool nullable) {
std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable);
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 2ba4162..eb74147 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -213,7 +213,7 @@
const WriterProperties* properties)
: ColumnWriter(metadata, std::move(pager), expected_rows,
(encoding == Encoding::PLAIN_DICTIONARY ||
- encoding == Encoding::RLE_DICTIONARY),
+ encoding == Encoding::RLE_DICTIONARY),
encoding, properties) {
switch (encoding) {
case Encoding::PLAIN: