blob: fbab8fab55f37eb222a2b35c458efff17103512c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/arrow/reader.h"
#include <algorithm>
#include <cstring>
#include <future>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/thread_pool.h"
#include "parquet/arrow/reader_internal.h"
#include "parquet/column_reader.h"
#include "parquet/exception.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
using arrow::Array;
using arrow::BooleanArray;
using arrow::ChunkedArray;
using arrow::DataType;
using arrow::Field;
using arrow::Int32Array;
using arrow::ListArray;
using arrow::MemoryPool;
using arrow::ResizableBuffer;
using arrow::Status;
using arrow::StructArray;
using arrow::Table;
using arrow::TimestampArray;
using arrow::internal::Iota;
using parquet::schema::GroupNode;
using parquet::schema::Node;
using parquet::schema::PrimitiveNode;
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;
using arrow::RecordBatchReader;
using parquet::internal::RecordReader;
#define BEGIN_PARQUET_CATCH_EXCEPTIONS try {
#define END_PARQUET_CATCH_EXCEPTIONS \
} \
catch (const ::parquet::ParquetException& e) { \
return ::arrow::Status::IOError(e.what()); \
}
namespace parquet {
namespace arrow {
class ColumnReaderImpl : public ColumnReader {
public:
enum ReaderType { PRIMITIVE, LIST, STRUCT };
virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0;
virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0;
virtual const std::shared_ptr<Field> field() = 0;
virtual const ColumnDescriptor* descr() const = 0;
virtual ReaderType type() const = 0;
};
// ----------------------------------------------------------------------
// FileReaderImpl forward declaration
class FileReaderImpl : public FileReader {
public:
FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
const ArrowReaderProperties& properties)
: pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {}
Status Init() {
return BuildSchemaManifest(reader_->metadata()->schema(),
reader_->metadata()->key_value_metadata(),
reader_properties_, &manifest_);
}
FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) {
return [row_groups](int i, ParquetFileReader* reader) {
return new FileColumnIterator(i, reader, row_groups);
};
}
FileColumnIteratorFactory AllRowGroupsFactory() {
return SomeRowGroupsFactory(Iota(reader_->metadata()->num_row_groups()));
}
Status BoundsCheckColumn(int column) {
if (column < 0 || column >= this->num_columns()) {
return Status::Invalid("Column index out of bounds (got ", column,
", should be "
"between 0 and ",
this->num_columns() - 1, ")");
}
return Status::OK();
}
Status BoundsCheckRowGroup(int row_group) {
// row group indices check
if (row_group < 0 || row_group >= num_row_groups()) {
return Status::Invalid("Some index in row_group_indices is ", row_group,
", which is either < 0 or >= num_row_groups(",
num_row_groups(), ")");
}
return Status::OK();
}
int64_t GetTotalRecords(const std::vector<int>& row_groups, int column_chunk = 0) {
// Can throw exception
int64_t records = 0;
for (auto row_group : row_groups) {
records += reader_->metadata()
->RowGroup(row_group)
->ColumnChunk(column_chunk)
->num_values();
}
return records;
}
std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
Status ReadTable(const std::vector<int>& indices,
std::shared_ptr<Table>* out) override {
return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out);
}
Status GetFieldReader(int i, const std::vector<int>& indices,
const std::vector<int>& row_groups,
std::unique_ptr<ColumnReaderImpl>* out) {
auto ctx = std::make_shared<ReaderContext>();
ctx->reader = reader_.get();
ctx->pool = pool_;
ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
ctx->filter_leaves = true;
ctx->included_leaves.insert(indices.begin(), indices.end());
return manifest_.schema_fields[i].GetReader(ctx, out);
}
Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out);
Status ReadSchemaField(int i, const std::vector<int>& indices,
const std::vector<int>& row_groups,
std::shared_ptr<Field>* out_field,
std::shared_ptr<ChunkedArray>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(GetFieldReader(i, indices, row_groups, &reader));
*out_field = reader->field();
// TODO(wesm): This calculation doesn't make much sense when we have repeated
// schema nodes
int64_t records_to_read = GetTotalRecords(row_groups, i);
return reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
}
Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
return GetColumn(i, AllRowGroupsFactory(), out);
}
Status GetSchema(std::shared_ptr<::arrow::Schema>* out) override {
return FromParquetSchema(reader_->metadata()->schema(), reader_properties_,
reader_->metadata()->key_value_metadata(), out);
}
Status ReadSchemaField(int i, const std::vector<int>& indices,
const std::vector<int>& row_groups,
std::shared_ptr<ChunkedArray>* out) {
std::shared_ptr<Field> unused;
return ReadSchemaField(i, indices, row_groups, &unused, out);
}
Status ReadSchemaField(int i, const std::vector<int>& indices,
std::shared_ptr<ChunkedArray>* out) {
return ReadSchemaField(i, indices, Iota(reader_->metadata()->num_row_groups()), out);
}
Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
return ReadSchemaField(i, Iota(reader_->metadata()->num_columns()),
Iota(reader_->metadata()->num_row_groups()), out);
}
Status ReadColumn(int i, const std::vector<int>& row_groups,
std::shared_ptr<ChunkedArray>* out) {
std::unique_ptr<ColumnReader> flat_column_reader;
RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader));
BEGIN_PARQUET_CATCH_EXCEPTIONS
int64_t records_to_read = GetTotalRecords(row_groups, i);
return flat_column_reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
}
Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override {
return ReadColumn(i, Iota(reader_->metadata()->num_row_groups()), out);
}
Status ReadTable(std::shared_ptr<Table>* table) override {
return ReadTable(Iota(reader_->metadata()->num_columns()), table);
}
Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
std::shared_ptr<Table>* table) override;
Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* table) override {
return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table);
}
Status ReadRowGroup(int row_group_index, const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) override {
return ReadRowGroups({row_group_index}, column_indices, out);
}
Status ReadRowGroup(int i, std::shared_ptr<Table>* table) override {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
}
Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) override;
Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReader(row_group_indices,
Iota(reader_->metadata()->num_columns()), out);
}
int num_columns() const { return reader_->metadata()->num_columns(); }
ParquetFileReader* parquet_reader() const override { return reader_.get(); }
int num_row_groups() const override { return reader_->metadata()->num_row_groups(); }
void set_use_threads(bool use_threads) override {
reader_properties_.set_use_threads(use_threads);
}
Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
int64_t* num_rows) override {
BEGIN_PARQUET_CATCH_EXCEPTIONS
*num_rows = ScanFileContents(columns, column_batch_size, reader_.get());
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
MemoryPool* pool_;
std::unique_ptr<ParquetFileReader> reader_;
ArrowReaderProperties reader_properties_;
SchemaManifest manifest_;
};
class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
public:
RowGroupRecordBatchReader(std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers,
std::shared_ptr<::arrow::Schema> schema, int64_t batch_size)
: field_readers_(std::move(field_readers)),
schema_(schema),
batch_size_(batch_size) {}
~RowGroupRecordBatchReader() override {}
std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
static Status Make(const std::vector<int>& row_groups,
const std::vector<int>& column_indices, FileReaderImpl* reader,
int64_t batch_size,
std::unique_ptr<::arrow::RecordBatchReader>* out) {
std::vector<int> field_indices;
if (!reader->manifest_.GetFieldIndices(column_indices, &field_indices)) {
return Status::Invalid("Invalid column index");
}
std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers(field_indices.size());
std::vector<std::shared_ptr<Field>> fields;
for (size_t i = 0; i < field_indices.size(); ++i) {
RETURN_NOT_OK(reader->GetFieldReader(field_indices[i], column_indices, row_groups,
&field_readers[i]));
fields.push_back(field_readers[i]->field());
}
out->reset(new RowGroupRecordBatchReader(std::move(field_readers),
::arrow::schema(fields), batch_size));
return Status::OK();
}
Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
// TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this
// does not currently honor the use_threads option.
std::vector<std::shared_ptr<ChunkedArray>> columns(field_readers_.size());
for (size_t i = 0; i < field_readers_.size(); ++i) {
RETURN_NOT_OK(field_readers_[i]->NextBatch(batch_size_, &columns[i]));
if (columns[i]->num_chunks() > 1) {
return Status::NotImplemented("This class cannot yet iterate chunked arrays");
}
}
// Create an intermediate table and use TableBatchReader as an adaptor to a
// RecordBatch
std::shared_ptr<Table> table = Table::Make(schema_, columns);
RETURN_NOT_OK(table->Validate());
::arrow::TableBatchReader table_batch_reader(*table);
return table_batch_reader.ReadNext(out);
}
private:
std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers_;
std::shared_ptr<::arrow::Schema> schema_;
int64_t batch_size_;
};
class ColumnChunkReaderImpl : public ColumnChunkReader {
public:
ColumnChunkReaderImpl(FileReaderImpl* impl, int row_group_index, int column_index)
: impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) override {
return impl_->ReadColumn(column_index_, {row_group_index_}, out);
}
private:
FileReaderImpl* impl_;
int column_index_;
int row_group_index_;
};
class RowGroupReaderImpl : public RowGroupReader {
public:
RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index)
: impl_(impl), row_group_index_(row_group_index) {}
std::shared_ptr<ColumnChunkReader> Column(int column_index) override {
return std::shared_ptr<ColumnChunkReader>(
new ColumnChunkReaderImpl(impl_, row_group_index_, column_index));
}
Status ReadTable(const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out) override {
return impl_->ReadRowGroup(row_group_index_, column_indices, out);
}
Status ReadTable(std::shared_ptr<::arrow::Table>* out) override {
return impl_->ReadRowGroup(row_group_index_, out);
}
private:
FileReaderImpl* impl_;
int row_group_index_;
};
// Leaf reader is for primitive arrays and primitive children of nested arrays
class LeafReader : public ColumnReaderImpl {
public:
LeafReader(const std::shared_ptr<ReaderContext>& ctx,
const std::shared_ptr<Field>& field,
std::unique_ptr<FileColumnIterator> input)
: ctx_(ctx), field_(field), input_(std::move(input)), descr_(input_->descr()) {
record_reader_ = RecordReader::Make(descr_, ctx_->pool,
field->type()->id() == ::arrow::Type::DICTIONARY);
NextRowGroup();
}
Status GetDefLevels(const int16_t** data, int64_t* length) override {
*data = record_reader_->def_levels();
*length = record_reader_->levels_position();
return Status::OK();
}
Status GetRepLevels(const int16_t** data, int64_t* length) override {
*data = record_reader_->rep_levels();
*length = record_reader_->levels_position();
return Status::OK();
}
Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override {
BEGIN_PARQUET_CATCH_EXCEPTIONS
// Pre-allocation gives much better performance for flat columns
record_reader_->Reserve(records_to_read);
record_reader_->Reset();
while (records_to_read > 0) {
if (!record_reader_->HasMoreData()) {
break;
}
int64_t records_read = record_reader_->ReadRecords(records_to_read);
records_to_read -= records_read;
if (records_read == 0) {
NextRowGroup();
}
}
RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), descr_,
ctx_->pool, out));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
const std::shared_ptr<Field> field() override { return field_; }
const ColumnDescriptor* descr() const override { return descr_; }
ReaderType type() const override { return PRIMITIVE; }
private:
void NextRowGroup() {
std::unique_ptr<PageReader> page_reader = input_->NextChunk();
record_reader_->SetPageReader(std::move(page_reader));
}
std::shared_ptr<ReaderContext> ctx_;
std::shared_ptr<Field> field_;
std::unique_ptr<FileColumnIterator> input_;
const ColumnDescriptor* descr_;
std::shared_ptr<RecordReader> record_reader_;
};
class NestedListReader : public ColumnReaderImpl {
public:
NestedListReader(const std::shared_ptr<ReaderContext>& ctx,
std::shared_ptr<Field> field, int16_t max_definition_level,
int16_t max_repetition_level,
std::unique_ptr<ColumnReaderImpl> item_reader)
: ctx_(ctx),
field_(field),
max_definition_level_(max_definition_level),
max_repetition_level_(max_repetition_level),
item_reader_(std::move(item_reader)) {}
Status GetDefLevels(const int16_t** data, int64_t* length) override {
return item_reader_->GetDefLevels(data, length);
}
Status GetRepLevels(const int16_t** data, int64_t* length) override {
return item_reader_->GetRepLevels(data, length);
}
Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override {
if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
return Status::Invalid("Mix of struct and list types not yet supported");
}
RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
// ARROW-3762(wesm): If item reader yields a chunked array, we reject as
// this is not yet implemented
if ((*out)->num_chunks() > 1) {
return Status::NotImplemented(
"Nested data conversions not implemented for chunked array outputs");
}
const int16_t* def_levels;
const int16_t* rep_levels;
int64_t num_levels;
RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
std::shared_ptr<Array> result;
RETURN_NOT_OK(ReconstructNestedList((*out)->chunk(0), field_, max_definition_level_,
max_repetition_level_, def_levels, rep_levels,
num_levels, ctx_->pool, &result));
*out = std::make_shared<ChunkedArray>(result);
return Status::OK();
}
const std::shared_ptr<Field> field() override { return field_; }
const ColumnDescriptor* descr() const override { return nullptr; }
ReaderType type() const override { return LIST; }
private:
std::shared_ptr<ReaderContext> ctx_;
std::shared_ptr<Field> field_;
int16_t max_definition_level_;
int16_t max_repetition_level_;
std::unique_ptr<ColumnReaderImpl> item_reader_;
};
class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
public:
explicit StructReader(const std::shared_ptr<ReaderContext>& ctx,
const SchemaField& schema_field,
std::shared_ptr<Field> filtered_field,
std::vector<std::unique_ptr<ColumnReaderImpl>>&& children)
: ctx_(ctx),
schema_field_(schema_field),
filtered_field_(filtered_field),
struct_def_level_(schema_field.max_definition_level),
children_(std::move(children)) {}
Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
Status GetDefLevels(const int16_t** data, int64_t* length) override;
Status GetRepLevels(const int16_t** data, int64_t* length) override;
const std::shared_ptr<Field> field() override { return filtered_field_; }
const ColumnDescriptor* descr() const override { return nullptr; }
ReaderType type() const override { return STRUCT; }
private:
std::shared_ptr<ReaderContext> ctx_;
SchemaField schema_field_;
std::shared_ptr<Field> filtered_field_;
int16_t struct_def_level_;
std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
std::shared_ptr<ResizableBuffer> def_levels_buffer_;
Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
};
Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out,
int64_t* null_count_out) {
std::shared_ptr<Buffer> null_bitmap;
auto null_count = 0;
const int16_t* def_levels_data;
int64_t def_levels_length;
RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
RETURN_NOT_OK(AllocateEmptyBitmap(ctx_->pool, def_levels_length, &null_bitmap));
uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
for (int64_t i = 0; i < def_levels_length; i++) {
if (def_levels_data[i] < struct_def_level_) {
// Mark null
null_count += 1;
} else {
DCHECK_EQ(def_levels_data[i], struct_def_level_);
::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
}
}
*null_count_out = null_count;
*null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
return Status::OK();
}
// TODO(itaiin): Consider caching the results of this calculation -
// note that this is only used once for each read for now
Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
*data = nullptr;
if (children_.size() == 0) {
// Empty struct
*length = 0;
return Status::OK();
}
// We have at least one child
const int16_t* child_def_levels;
int64_t child_length = 0;
bool found_nullable_child = false;
int16_t* result_levels = nullptr;
int child_index = 0;
while (child_index < static_cast<int>(children_.size())) {
if (!children_[child_index]->field()->nullable()) {
++child_index;
continue;
}
RETURN_NOT_OK(children_[child_index]->GetDefLevels(&child_def_levels, &child_length));
auto size = child_length * sizeof(int16_t);
RETURN_NOT_OK(AllocateResizableBuffer(ctx_->pool, size, &def_levels_buffer_));
// Initialize with the minimal def level
std::memset(def_levels_buffer_->mutable_data(), -1, size);
result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
found_nullable_child = true;
break;
}
if (!found_nullable_child) {
*data = nullptr;
*length = 0;
return Status::OK();
}
// Look at the rest of the children
// When a struct is defined, all of its children def levels are at least at
// nesting level, and def level equals nesting level.
// When a struct is not defined, all of its children def levels are less than
// the nesting level, and the def level equals max(children def levels)
// All other possibilities are malformed definition data.
for (; child_index < static_cast<int>(children_.size()); ++child_index) {
// Child is non-nullable, and therefore has no definition levels
if (!children_[child_index]->field()->nullable()) {
continue;
}
auto& child = children_[child_index];
int64_t current_child_length;
RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, &current_child_length));
if (child_length != current_child_length) {
std::stringstream ss;
ss << "Parquet struct decoding error. Expected to decode " << child_length
<< " definition levels"
<< " from child field \"" << child->field()->ToString() << "\" in parent \""
<< this->field()->ToString() << "\" but was only able to decode "
<< current_child_length;
return Status::IOError(ss.str());
}
DCHECK_EQ(child_length, current_child_length);
for (int64_t i = 0; i < child_length; i++) {
// Check that value is either uninitialized, or current
// and previous children def levels agree on the struct level
DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) ==
(child_def_levels[i] >= struct_def_level_)));
result_levels[i] =
std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
}
}
*data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data());
*length = static_cast<int64_t>(child_length);
return Status::OK();
}
Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
return Status::NotImplemented("GetRepLevels is not implemented for struct");
}
Status StructReader::NextBatch(int64_t records_to_read,
std::shared_ptr<ChunkedArray>* out) {
std::vector<std::shared_ptr<Array>> children_arrays;
std::shared_ptr<Buffer> null_bitmap;
int64_t null_count;
// Gather children arrays and def levels
for (auto& child : children_) {
if (child->type() == ColumnReaderImpl::LIST) {
return Status::Invalid("Mix of struct and list types not yet supported");
}
std::shared_ptr<ChunkedArray> field;
RETURN_NOT_OK(child->NextBatch(records_to_read, &field));
if (field->num_chunks() > 1) {
return Status::Invalid("Chunked field reads not yet supported with StructArray");
}
children_arrays.push_back(field->chunk(0));
}
RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
int64_t struct_length = children_arrays[0]->length();
for (size_t i = 1; i < children_arrays.size(); ++i) {
if (children_arrays[i]->length() != struct_length) {
// TODO(wesm): This should really only occur if the Parquet file is
// malformed. Should this be a DCHECK?
return Status::Invalid("Struct children had different lengths");
}
}
auto result = std::make_shared<StructArray>(field()->type(), struct_length,
children_arrays, null_bitmap, null_count);
*out = std::make_shared<ChunkedArray>(result);
return Status::OK();
}
// ----------------------------------------------------------------------
// File reader implementation
Status SchemaField::GetReader(const std::shared_ptr<ReaderContext>& ctx,
std::unique_ptr<ColumnReaderImpl>* out) const {
auto type_id = this->field->type()->id();
if (this->children.size() == 0) {
std::unique_ptr<FileColumnIterator> input(
ctx->iterator_factory(this->column_index, ctx->reader));
out->reset(new LeafReader(ctx, this->field, std::move(input)));
} else if (type_id == ::arrow::Type::LIST) {
// We can only read lists-of-lists or structs at the moment
auto list_field = this->field;
auto child = &this->children[0];
while (child->field->type()->id() == ::arrow::Type::LIST) {
child = &child->children[0];
}
if (child->field->type()->id() == ::arrow::Type::STRUCT) {
return Status::NotImplemented(
"Reading lists of structs from Parquet files "
"not yet supported: ",
this->field->ToString());
}
if (!ctx->IncludesLeaf(child->column_index)) {
*out = nullptr;
return Status::OK();
}
std::unique_ptr<ColumnReaderImpl> child_reader;
RETURN_NOT_OK(child->GetReader(ctx, &child_reader));
// Use the max definition/repetition level of the leaf here
out->reset(new NestedListReader(ctx, list_field, child->max_definition_level,
child->max_repetition_level,
std::move(child_reader)));
} else if (type_id == ::arrow::Type::STRUCT) {
std::vector<std::shared_ptr<Field>> child_fields;
std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
for (const auto& child : this->children) {
if (child.is_leaf() && !ctx->IncludesLeaf(child.column_index)) {
// Excluded leaf
continue;
}
std::unique_ptr<ColumnReaderImpl> child_reader;
RETURN_NOT_OK(child.GetReader(ctx, &child_reader));
if (!child_reader) {
// If all children were pruned, then we do not try to read this field
continue;
}
child_fields.push_back(child.field);
child_readers.emplace_back(std::move(child_reader));
}
if (child_fields.size() == 0) {
*out = nullptr;
return Status::OK();
}
auto filtered_field = ::arrow::field(
this->field->name(), ::arrow::struct_(child_fields), this->field->nullable());
out->reset(new StructReader(ctx, *this, filtered_field, std::move(child_readers)));
} else {
return Status::Invalid("Unsupported nested type: ", this->field->ToString());
}
return Status::OK();
}
Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
// column indices check
for (auto row_group_index : row_group_indices) {
RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
}
return RowGroupRecordBatchReader::Make(row_group_indices, column_indices, this,
reader_properties_.batch_size(), out);
}
Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out) {
RETURN_NOT_OK(BoundsCheckColumn(i));
auto ctx = std::make_shared<ReaderContext>();
ctx->reader = reader_.get();
ctx->pool = pool_;
ctx->iterator_factory = AllRowGroupsFactory();
ctx->filter_leaves = false;
std::unique_ptr<ColumnReaderImpl> result;
RETURN_NOT_OK(manifest_.schema_fields[i].GetReader(ctx, &result));
out->reset(result.release());
return Status::OK();
}
Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
std::shared_ptr<Table>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
// We only need to read schema fields which have columns indicated
// in the indices vector
std::vector<int> field_indices;
if (!manifest_.GetFieldIndices(indices, &field_indices)) {
return Status::Invalid("Invalid column index");
}
int num_fields = static_cast<int>(field_indices.size());
std::vector<std::shared_ptr<Field>> fields(num_fields);
std::vector<std::shared_ptr<ChunkedArray>> columns(num_fields);
auto ReadColumnFunc = [&](int i) {
return ReadSchemaField(field_indices[i], indices, row_groups, &fields[i],
&columns[i]);
};
if (reader_properties_.use_threads()) {
std::vector<std::future<Status>> futures;
auto pool = ::arrow::internal::GetCpuThreadPool();
for (int i = 0; i < num_fields; i++) {
futures.push_back(pool->Submit(ReadColumnFunc, i));
}
Status final_status = Status::OK();
for (auto& fut : futures) {
Status st = fut.get();
if (!st.ok()) {
final_status = std::move(st);
}
}
RETURN_NOT_OK(final_status);
} else {
for (int i = 0; i < num_fields; i++) {
RETURN_NOT_OK(ReadColumnFunc(i));
}
}
auto result_schema = ::arrow::schema(fields, manifest_.schema_metadata);
*out = Table::Make(result_schema, columns);
return (*out)->Validate();
END_PARQUET_CATCH_EXCEPTIONS
}
std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
return std::make_shared<RowGroupReaderImpl>(this, row_group_index);
}
// ----------------------------------------------------------------------
// Public factory functions
Status FileReader::Make(::arrow::MemoryPool* pool,
std::unique_ptr<ParquetFileReader> reader,
const ArrowReaderProperties& properties,
std::unique_ptr<FileReader>* out) {
out->reset(new FileReaderImpl(pool, std::move(reader), properties));
return static_cast<FileReaderImpl*>(out->get())->Init();
}
Status FileReader::Make(::arrow::MemoryPool* pool,
std::unique_ptr<ParquetFileReader> reader,
std::unique_ptr<FileReader>* out) {
return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
}
FileReaderBuilder::FileReaderBuilder()
: pool_(::arrow::default_memory_pool()),
properties_(default_arrow_reader_properties()) {}
Status FileReaderBuilder::Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
const ReaderProperties& properties,
const std::shared_ptr<FileMetaData>& metadata) {
PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::Open(file, properties, metadata));
return Status::OK();
}
FileReaderBuilder* FileReaderBuilder::memory_pool(::arrow::MemoryPool* pool) {
pool_ = pool;
return this;
}
FileReaderBuilder* FileReaderBuilder::properties(
const ArrowReaderProperties& arg_properties) {
properties_ = arg_properties;
return this;
}
Status FileReaderBuilder::Build(std::unique_ptr<FileReader>* out) {
return FileReader::Make(pool_, std::move(raw_reader_), properties_, out);
}
Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
MemoryPool* pool, std::unique_ptr<FileReader>* reader) {
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(file));
return builder.memory_pool(pool)->Build(reader);
}
Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
MemoryPool* pool, const ReaderProperties& props,
const std::shared_ptr<FileMetaData>& metadata,
std::unique_ptr<FileReader>* reader) {
// Deprecated since 0.15.0
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(file, props, metadata));
return builder.memory_pool(pool)->Build(reader);
}
Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
MemoryPool* pool, const ArrowReaderProperties& properties,
std::unique_ptr<FileReader>* reader) {
// Deprecated since 0.15.0
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(file));
return builder.memory_pool(pool)->properties(properties)->Build(reader);
}
} // namespace arrow
} // namespace parquet