blob: 69a27cdd9967f4ff360b42c31516a9d1e36f4b06 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/arrow/reader_internal.h"
#include <algorithm>
#include <climits>
#include <cstdint>
#include <cstring>
#include <memory>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <vector>
#include <boost/algorithm/string/predicate.hpp>
#include "arrow/array.h"
#include "arrow/builder.h"
#include "arrow/compute/kernel.h"
#include "arrow/extension_type.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/reader.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/base64.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/int_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/ubsan.h"
#include "parquet/arrow/reader.h"
#include "parquet/column_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/types.h"
using arrow::Array;
using arrow::BooleanArray;
using arrow::ChunkedArray;
using arrow::DataType;
using arrow::Field;
using arrow::Int32Array;
using arrow::ListArray;
using arrow::MemoryPool;
using arrow::ResizableBuffer;
using arrow::Status;
using arrow::StructArray;
using arrow::Table;
using arrow::TimestampArray;
using arrow::compute::Datum;
using ::arrow::BitUtil::FromBigEndian;
using ::arrow::internal::checked_cast;
using ::arrow::internal::SafeLeftShift;
using ::arrow::util::SafeLoadAs;
using parquet::internal::RecordReader;
using parquet::schema::GroupNode;
using parquet::schema::Node;
using parquet::schema::PrimitiveNode;
using ParquetType = parquet::Type;
namespace parquet {
namespace arrow {
template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
// ----------------------------------------------------------------------
// Schema logic
static Status MakeArrowDecimal(const LogicalType& logical_type,
std::shared_ptr<DataType>* out) {
const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
*out = ::arrow::decimal(decimal.precision(), decimal.scale());
return Status::OK();
}
static Status MakeArrowInt(const LogicalType& logical_type,
std::shared_ptr<DataType>* out) {
const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
switch (integer.bit_width()) {
case 8:
*out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
break;
case 16:
*out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
break;
case 32:
*out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
break;
default:
return Status::TypeError(logical_type.ToString(),
" can not annotate physical type Int32");
}
return Status::OK();
}
static Status MakeArrowInt64(const LogicalType& logical_type,
std::shared_ptr<DataType>* out) {
const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
switch (integer.bit_width()) {
case 64:
*out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
break;
default:
return Status::TypeError(logical_type.ToString(),
" can not annotate physical type Int64");
}
return Status::OK();
}
static Status MakeArrowTime32(const LogicalType& logical_type,
std::shared_ptr<DataType>* out) {
const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
switch (time.time_unit()) {
case LogicalType::TimeUnit::MILLIS:
*out = ::arrow::time32(::arrow::TimeUnit::MILLI);
break;
default:
return Status::TypeError(logical_type.ToString(),
" can not annotate physical type Time32");
}
return Status::OK();
}
static Status MakeArrowTime64(const LogicalType& logical_type,
std::shared_ptr<DataType>* out) {
const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
switch (time.time_unit()) {
case LogicalType::TimeUnit::MICROS:
*out = ::arrow::time64(::arrow::TimeUnit::MICRO);
break;
case LogicalType::TimeUnit::NANOS:
*out = ::arrow::time64(::arrow::TimeUnit::NANO);
break;
default:
return Status::TypeError(logical_type.ToString(),
" can not annotate physical type Time64");
}
return Status::OK();
}
static Status MakeArrowTimestamp(const LogicalType& logical_type,
std::shared_ptr<DataType>* out) {
const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
const bool utc_normalized =
timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
static const char* utc_timezone = "UTC";
switch (timestamp.time_unit()) {
case LogicalType::TimeUnit::MILLIS:
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::MILLI));
break;
case LogicalType::TimeUnit::MICROS:
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::MICRO));
break;
case LogicalType::TimeUnit::NANOS:
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::NANO));
break;
default:
return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
logical_type.ToString());
}
return Status::OK();
}
static Status FromByteArray(const LogicalType& logical_type,
std::shared_ptr<DataType>* out) {
switch (logical_type.type()) {
case LogicalType::Type::STRING:
*out = ::arrow::utf8();
break;
case LogicalType::Type::DECIMAL:
RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
break;
case LogicalType::Type::NONE:
case LogicalType::Type::ENUM:
case LogicalType::Type::JSON:
case LogicalType::Type::BSON:
*out = ::arrow::binary();
break;
default:
return Status::NotImplemented("Unhandled logical logical_type ",
logical_type.ToString(), " for binary array");
}
return Status::OK();
}
static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length,
std::shared_ptr<DataType>* out) {
switch (logical_type.type()) {
case LogicalType::Type::DECIMAL:
RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
break;
case LogicalType::Type::NONE:
case LogicalType::Type::INTERVAL:
case LogicalType::Type::UUID:
*out = ::arrow::fixed_size_binary(physical_length);
break;
default:
return Status::NotImplemented("Unhandled logical logical_type ",
logical_type.ToString(),
" for fixed-length binary array");
}
return Status::OK();
}
static Status FromInt32(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
switch (logical_type.type()) {
case LogicalType::Type::INT:
RETURN_NOT_OK(MakeArrowInt(logical_type, out));
break;
case LogicalType::Type::DATE:
*out = ::arrow::date32();
break;
case LogicalType::Type::TIME:
RETURN_NOT_OK(MakeArrowTime32(logical_type, out));
break;
case LogicalType::Type::DECIMAL:
RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
break;
case LogicalType::Type::NONE:
*out = ::arrow::int32();
break;
default:
return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
" for INT32");
}
return Status::OK();
}
static Status FromInt64(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
switch (logical_type.type()) {
case LogicalType::Type::INT:
RETURN_NOT_OK(MakeArrowInt64(logical_type, out));
break;
case LogicalType::Type::DECIMAL:
RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
break;
case LogicalType::Type::TIMESTAMP:
RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out));
break;
case LogicalType::Type::TIME:
RETURN_NOT_OK(MakeArrowTime64(logical_type, out));
break;
case LogicalType::Type::NONE:
*out = ::arrow::int64();
break;
default:
return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
" for INT64");
}
return Status::OK();
}
Status GetPrimitiveType(const schema::PrimitiveNode& primitive,
std::shared_ptr<DataType>* out) {
const std::shared_ptr<const LogicalType>& logical_type = primitive.logical_type();
if (logical_type->is_invalid() || logical_type->is_null()) {
*out = ::arrow::null();
return Status::OK();
}
switch (primitive.physical_type()) {
case ParquetType::BOOLEAN:
*out = ::arrow::boolean();
break;
case ParquetType::INT32:
RETURN_NOT_OK(FromInt32(*logical_type, out));
break;
case ParquetType::INT64:
RETURN_NOT_OK(FromInt64(*logical_type, out));
break;
case ParquetType::INT96:
*out = ::arrow::timestamp(::arrow::TimeUnit::NANO);
break;
case ParquetType::FLOAT:
*out = ::arrow::float32();
break;
case ParquetType::DOUBLE:
*out = ::arrow::float64();
break;
case ParquetType::BYTE_ARRAY:
RETURN_NOT_OK(FromByteArray(*logical_type, out));
break;
case ParquetType::FIXED_LEN_BYTE_ARRAY:
RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out));
break;
default: {
// PARQUET-1565: This can occur if the file is corrupt
return Status::IOError("Invalid physical column type: ",
TypeToString(primitive.physical_type()));
}
}
return Status::OK();
}
struct SchemaTreeContext {
SchemaManifest* manifest;
ArrowReaderProperties properties;
const SchemaDescriptor* schema;
void LinkParent(const SchemaField* child, const SchemaField* parent) {
manifest->child_to_parent[child] = parent;
}
void RecordLeaf(const SchemaField* leaf) {
manifest->column_index_to_field[leaf->column_index] = leaf;
}
};
bool IsDictionaryReadSupported(const DataType& type) {
// Only supported currently for BYTE_ARRAY types
return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING;
}
Status GetTypeForNode(int column_index, const schema::PrimitiveNode& primitive_node,
SchemaTreeContext* ctx, std::shared_ptr<DataType>* out) {
std::shared_ptr<DataType> storage_type;
RETURN_NOT_OK(GetPrimitiveType(primitive_node, &storage_type));
if (ctx->properties.read_dictionary(column_index) &&
IsDictionaryReadSupported(*storage_type)) {
*out = ::arrow::dictionary(::arrow::int32(), storage_type);
} else {
*out = storage_type;
}
return Status::OK();
}
Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
SchemaTreeContext* ctx, const SchemaField* parent,
SchemaField* out);
Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
int16_t max_rep_level, SchemaTreeContext* ctx,
const SchemaField* parent, SchemaField* out);
Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
int16_t max_def_level, int16_t max_rep_level, SchemaTreeContext* ctx,
const SchemaField* parent, SchemaField* out) {
out->field = field;
out->column_index = column_index;
out->max_definition_level = max_def_level;
out->max_repetition_level = max_rep_level;
ctx->RecordLeaf(out);
ctx->LinkParent(out, parent);
return Status::OK();
}
// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
bool HasStructListName(const GroupNode& node) {
return node.name() == "array" || boost::algorithm::ends_with(node.name(), "_tuple");
}
Status GroupToStruct(const GroupNode& node, int16_t max_def_level, int16_t max_rep_level,
SchemaTreeContext* ctx, const SchemaField* parent,
SchemaField* out) {
std::vector<std::shared_ptr<Field>> arrow_fields;
out->children.resize(node.field_count());
for (int i = 0; i < node.field_count(); i++) {
RETURN_NOT_OK(NodeToSchemaField(*node.field(i), max_def_level, max_rep_level, ctx,
out, &out->children[i]));
arrow_fields.push_back(out->children[i].field);
}
auto struct_type = ::arrow::struct_(arrow_fields);
out->field = ::arrow::field(node.name(), struct_type, node.is_optional());
out->max_definition_level = max_def_level;
out->max_repetition_level = max_rep_level;
return Status::OK();
}
Status ListToSchemaField(const GroupNode& group, int16_t max_def_level,
int16_t max_rep_level, SchemaTreeContext* ctx,
const SchemaField* parent, SchemaField* out) {
if (group.field_count() != 1) {
return Status::NotImplemented(
"Only LIST-annotated groups with a single child can be handled.");
}
out->children.resize(1);
SchemaField* child_field = &out->children[0];
ctx->LinkParent(out, parent);
ctx->LinkParent(child_field, out);
const Node& list_node = *group.field(0);
if (!list_node.is_repeated()) {
return Status::NotImplemented(
"Non-repeated nodes in a LIST-annotated group are not supported.");
}
++max_def_level;
++max_rep_level;
if (list_node.is_group()) {
// Resolve 3-level encoding
//
// required/optional group name=whatever {
// repeated group name=list {
// required/optional TYPE item;
// }
// }
//
// yields list<item: TYPE ?nullable> ?nullable
//
// We distinguish the special base that we have
//
// required/optional group name=whatever {
// repeated group name=array or $SOMETHING_tuple {
// required/optional TYPE item;
// }
// }
//
// In this latter case, the inner type of the list should be a struct
// rather than a primitive value
//
// yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
const auto& list_group = static_cast<const GroupNode&>(list_node);
// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
// List of primitive type
RETURN_NOT_OK(NodeToSchemaField(*list_group.field(0), max_def_level, max_rep_level,
ctx, out, child_field));
} else {
RETURN_NOT_OK(
GroupToStruct(list_group, max_def_level, max_rep_level, ctx, out, child_field));
}
} else {
// Two-level list encoding
//
// required/optional group LIST {
// repeated TYPE;
// }
const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node);
int column_index = ctx->schema->GetColumnIndex(primitive_node);
std::shared_ptr<DataType> type;
RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
auto item_field = ::arrow::field(list_node.name(), type, /*nullable=*/false);
RETURN_NOT_OK(PopulateLeaf(column_index, item_field, max_def_level, max_rep_level,
ctx, out, child_field));
}
out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field),
group.is_optional());
out->max_definition_level = max_def_level;
out->max_repetition_level = max_rep_level;
return Status::OK();
}
Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
int16_t max_rep_level, SchemaTreeContext* ctx,
const SchemaField* parent, SchemaField* out) {
if (node.logical_type()->is_list()) {
return ListToSchemaField(node, max_def_level, max_rep_level, ctx, parent, out);
}
std::shared_ptr<DataType> type;
if (node.is_repeated()) {
// Simple repeated struct
//
// repeated group $NAME {
// r/o TYPE[0] f0
// r/o TYPE[1] f1
// }
out->children.resize(1);
RETURN_NOT_OK(
GroupToStruct(node, max_def_level, max_rep_level, ctx, out, &out->children[0]));
out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field),
node.is_optional());
out->max_definition_level = max_def_level;
out->max_repetition_level = max_rep_level;
return Status::OK();
} else {
return GroupToStruct(node, max_def_level, max_rep_level, ctx, parent, out);
}
}
Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
SchemaTreeContext* ctx, const SchemaField* parent,
SchemaField* out) {
/// Workhorse function for converting a Parquet schema node to an Arrow
/// type. Handles different conventions for nested data
if (node.is_optional()) {
++max_def_level;
} else if (node.is_repeated()) {
// Repeated fields add both a repetition and definition level. This is used
// to distinguish between an empty list and a list with an item in it.
++max_rep_level;
++max_def_level;
}
ctx->LinkParent(out, parent);
// Now, walk the schema and create a ColumnDescriptor for each leaf node
if (node.is_group()) {
// A nested field, but we don't know what kind yet
return GroupToSchemaField(static_cast<const GroupNode&>(node), max_def_level,
max_rep_level, ctx, parent, out);
} else {
// Either a normal flat primitive type, or a list type encoded with 1-level
// list encoding. Note that the 3-level encoding is the form recommended by
// the parquet specification, but technically we can have either
//
// required/optional $TYPE $FIELD_NAME
//
// or
//
// repeated $TYPE $FIELD_NAME
const auto& primitive_node = static_cast<const PrimitiveNode&>(node);
int column_index = ctx->schema->GetColumnIndex(primitive_node);
std::shared_ptr<DataType> type;
RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
if (node.is_repeated()) {
// One-level list encoding, e.g.
// a: repeated int32;
out->children.resize(1);
auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false);
RETURN_NOT_OK(PopulateLeaf(column_index, child_field, max_def_level, max_rep_level,
ctx, out, &out->children[0]));
out->field = ::arrow::field(node.name(), ::arrow::list(child_field),
/*nullable=*/false);
// Is this right?
out->max_definition_level = max_def_level;
out->max_repetition_level = max_rep_level;
return Status::OK();
} else {
// A normal (required/optional) primitive node
return PopulateLeaf(column_index,
::arrow::field(node.name(), type, node.is_optional()),
max_def_level, max_rep_level, ctx, parent, out);
}
}
}
Status GetOriginSchema(const std::shared_ptr<const KeyValueMetadata>& metadata,
std::shared_ptr<const KeyValueMetadata>* clean_metadata,
std::shared_ptr<::arrow::Schema>* out) {
if (metadata == nullptr) {
*out = nullptr;
*clean_metadata = nullptr;
return Status::OK();
}
static const std::string kArrowSchemaKey = "ARROW:schema";
int schema_index = metadata->FindKey(kArrowSchemaKey);
if (schema_index == -1) {
*out = nullptr;
*clean_metadata = metadata;
return Status::OK();
}
// The original Arrow schema was serialized using the store_schema option. We
// deserialize it here and use it to inform read options such as
// dictionary-encoded fields
auto decoded = ::arrow::util::base64_decode(metadata->value(schema_index));
auto schema_buf = std::make_shared<Buffer>(decoded);
::arrow::ipc::DictionaryMemo dict_memo;
::arrow::io::BufferReader input(schema_buf);
RETURN_NOT_OK(::arrow::ipc::ReadSchema(&input, &dict_memo, out));
if (metadata->size() > 1) {
// Copy the metadata without the schema key
auto new_metadata = ::arrow::key_value_metadata({}, {});
new_metadata->reserve(metadata->size() - 1);
for (int64_t i = 0; i < metadata->size(); ++i) {
if (i == schema_index) continue;
new_metadata->Append(metadata->key(i), metadata->value(i));
}
*clean_metadata = new_metadata;
} else {
// No other keys, let metadata be null
*clean_metadata = nullptr;
}
return Status::OK();
}
Status ApplyOriginalMetadata(std::shared_ptr<Field> field, const Field& origin_field,
std::shared_ptr<Field>* out) {
auto origin_type = origin_field.type();
if (field->type()->id() == ::arrow::Type::TIMESTAMP) {
// Restore time zone, if any
const auto& ts_type = static_cast<const ::arrow::TimestampType&>(*field->type());
const auto& ts_origin_type = static_cast<const ::arrow::TimestampType&>(*origin_type);
// If the unit is the same and the data is tz-aware, then set the original
// time zone, since Parquet has no native storage for timezones
if (ts_type.unit() == ts_origin_type.unit() && ts_type.timezone() == "UTC" &&
ts_origin_type.timezone() != "") {
field = field->WithType(origin_type);
}
}
if (origin_type->id() == ::arrow::Type::DICTIONARY &&
field->type()->id() != ::arrow::Type::DICTIONARY &&
IsDictionaryReadSupported(*field->type())) {
const auto& dict_origin_type =
static_cast<const ::arrow::DictionaryType&>(*origin_type);
field = field->WithType(
::arrow::dictionary(::arrow::int32(), field->type(), dict_origin_type.ordered()));
}
// restore field metadata
std::shared_ptr<const KeyValueMetadata> field_metadata = origin_field.metadata();
if (field_metadata != nullptr) {
field = field->WithMetadata(field_metadata);
// extension type
int name_index = field_metadata->FindKey(::arrow::kExtensionTypeKeyName);
if (name_index != -1) {
std::string type_name = field_metadata->value(name_index);
int data_index = field_metadata->FindKey(::arrow::kExtensionMetadataKeyName);
std::string type_data = data_index == -1 ? "" : field_metadata->value(data_index);
std::shared_ptr<::arrow::ExtensionType> ext_type =
::arrow::GetExtensionType(type_name);
if (ext_type != nullptr) {
std::shared_ptr<DataType> deserialized;
RETURN_NOT_OK(ext_type->Deserialize(field->type(), type_data, &deserialized));
field = field->WithType(deserialized);
}
}
}
*out = field;
return Status::OK();
}
Status BuildSchemaManifest(const SchemaDescriptor* schema,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const ArrowReaderProperties& properties,
SchemaManifest* manifest) {
std::shared_ptr<::arrow::Schema> origin_schema;
RETURN_NOT_OK(
GetOriginSchema(metadata, &manifest->schema_metadata, &manifest->origin_schema));
SchemaTreeContext ctx;
ctx.manifest = manifest;
ctx.properties = properties;
ctx.schema = schema;
const GroupNode& schema_node = *schema->group_node();
manifest->descr = schema;
manifest->schema_fields.resize(schema_node.field_count());
for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) {
SchemaField* out_field = &manifest->schema_fields[i];
RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), 0, 0, &ctx,
/*parent=*/nullptr, out_field));
// TODO(wesm): as follow up to ARROW-3246, we should really pass the origin
// schema (if any) through all functions in the schema reconstruction, but
// I'm being lazy and just setting dictionary fields at the top level for
// now
if (manifest->origin_schema == nullptr) {
continue;
}
auto origin_field = manifest->origin_schema->field(i);
RETURN_NOT_OK(
ApplyOriginalMetadata(out_field->field, *origin_field, &out_field->field));
}
return Status::OK();
}
Status FromParquetSchema(
const SchemaDescriptor* schema, const ArrowReaderProperties& properties,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
std::shared_ptr<::arrow::Schema>* out) {
SchemaManifest manifest;
RETURN_NOT_OK(BuildSchemaManifest(schema, key_value_metadata, properties, &manifest));
std::vector<std::shared_ptr<Field>> fields(manifest.schema_fields.size());
for (int i = 0; i < static_cast<int>(fields.size()); i++) {
fields[i] = manifest.schema_fields[i].field;
}
*out = ::arrow::schema(fields, key_value_metadata);
return Status::OK();
}
Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
const ArrowReaderProperties& properties,
std::shared_ptr<::arrow::Schema>* out) {
return FromParquetSchema(parquet_schema, properties, nullptr, out);
}
// ----------------------------------------------------------------------
// Primitive types
template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
*out = std::make_shared<ArrayType<ArrowType>>(
type, length, data, reader->ReleaseIsValid(), reader->null_count());
return Status::OK();
}
std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
const std::shared_ptr<DataType>& type) {
std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
reader->ReleaseValues()};
auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(),
buffers, reader->null_count());
return ::arrow::MakeArray(data);
}
Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
int64_t length = reader->values_written();
std::shared_ptr<Buffer> data;
const int64_t buffer_size = BitUtil::BytesForBits(length);
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
// Transfer boolean values to packed bitmap
auto values = reinterpret_cast<const bool*>(reader->values());
uint8_t* data_ptr = data->mutable_data();
memset(data_ptr, 0, buffer_size);
for (int64_t i = 0; i < length; i++) {
if (values[i]) {
::arrow::BitUtil::SetBit(data_ptr, i);
}
}
*out = std::make_shared<BooleanArray>(length, data, reader->ReleaseIsValid(),
reader->null_count());
return Status::OK();
}
Status TransferInt96(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
int64_t length = reader->values_written();
auto values = reinterpret_cast<const Int96*>(reader->values());
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
for (int64_t i = 0; i < length; i++) {
if (values[i].value[2] == 0) {
// Happens for null entries: avoid triggering UBSAN as that Int96 timestamp
// isn't representable as a 64-bit Unix timestamp.
*data_ptr++ = 0;
} else {
*data_ptr++ = Int96GetNanoSeconds(values[i]);
}
}
*out = std::make_shared<TimestampArray>(type, length, data, reader->ReleaseIsValid(),
reader->null_count());
return Status::OK();
}
Status TransferDate64(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
int64_t length = reader->values_written();
auto values = reinterpret_cast<const int32_t*>(reader->values());
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
for (int64_t i = 0; i < length; i++) {
*out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
}
*out = std::make_shared<::arrow::Date64Array>(
type, length, data, reader->ReleaseIsValid(), reader->null_count());
return Status::OK();
}
// ----------------------------------------------------------------------
// Binary, direct to dictionary-encoded
Status TransferDictionary(RecordReader* reader,
const std::shared_ptr<DataType>& logical_value_type,
std::shared_ptr<ChunkedArray>* out) {
auto dict_reader = dynamic_cast<internal::DictionaryRecordReader*>(reader);
DCHECK(dict_reader);
*out = dict_reader->GetResult();
if (!logical_value_type->Equals(*(*out)->type())) {
RETURN_NOT_OK((*out)->View(logical_value_type, out));
}
return Status::OK();
}
Status TransferBinary(RecordReader* reader,
const std::shared_ptr<DataType>& logical_value_type,
std::shared_ptr<ChunkedArray>* out) {
if (reader->read_dictionary()) {
return TransferDictionary(
reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out);
}
auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
DCHECK(binary_reader);
auto chunks = binary_reader->GetBuilderChunks();
for (const auto& chunk : chunks) {
if (!chunk->type()->Equals(*logical_value_type)) {
return ChunkedArray(chunks).View(logical_value_type, out);
}
}
*out = std::make_shared<ChunkedArray>(chunks, logical_value_type);
return Status::OK();
}
// ----------------------------------------------------------------------
// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
const int32_t length = stop - start;
DCHECK_GE(length, 0);
DCHECK_LE(length, 8);
switch (length) {
case 0:
return 0;
case 1:
return bytes[start];
case 2:
return FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
case 3: {
const uint64_t first_two_bytes = FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
const uint64_t last_byte = bytes[stop - 1];
return first_two_bytes << 8 | last_byte;
}
case 4:
return FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
case 5: {
const uint64_t first_four_bytes =
FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
const uint64_t last_byte = bytes[stop - 1];
return first_four_bytes << 8 | last_byte;
}
case 6: {
const uint64_t first_four_bytes =
FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
const uint64_t last_two_bytes =
FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
return first_four_bytes << 16 | last_two_bytes;
}
case 7: {
const uint64_t first_four_bytes =
FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
const uint64_t second_two_bytes =
FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
const uint64_t last_byte = bytes[stop - 1];
return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
}
case 8:
return FromBigEndian(SafeLoadAs<uint64_t>(bytes + start));
default: {
DCHECK(false);
return UINT64_MAX;
}
}
}
static constexpr int32_t kMinDecimalBytes = 1;
static constexpr int32_t kMaxDecimalBytes = 16;
/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
/// uint64_t (low bits).
static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
int64_t* out_high, uint64_t* out_low) {
DCHECK_GE(length, kMinDecimalBytes);
DCHECK_LE(length, kMaxDecimalBytes);
// XXX This code is copied from Decimal::FromBigEndian
int64_t high, low;
// Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
// sign bit.
const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
// 1. Extract the high bytes
// Stop byte of the high bytes
const int32_t high_bits_offset = std::max(0, length - 8);
const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
if (high_bits_offset == 8) {
// Avoid undefined shift by 64 below
high = high_bits;
} else {
high = -1 * (is_negative && length < kMaxDecimalBytes);
// Shift left enough bits to make room for the incoming int64_t
high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
// Preserve the upper bits by inplace OR-ing the int64_t
high |= high_bits;
}
// 2. Extract the low bytes
// Stop byte of the low bytes
const int32_t low_bits_offset = std::min(length, 8);
const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
if (low_bits_offset == 8) {
// Avoid undefined shift by 64 below
low = low_bits;
} else {
// Sign extend the low bits if necessary
low = -1 * (is_negative && length < 8);
// Shift left enough bits to make room for the incoming int64_t
low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
// Preserve the upper bits by inplace OR-ing the int64_t
low |= low_bits;
}
*out_high = high;
*out_low = static_cast<uint64_t>(low);
}
static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
uint8_t* out_buf) {
// view the first 8 bytes as an unsigned 64-bit integer
auto low = reinterpret_cast<uint64_t*>(out_buf);
// view the second 8 bytes as a signed 64-bit integer
auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
// Convert the fixed size binary array bytes into a Decimal128 compatible layout
BytesToIntegerPair(value, byte_width, high, low);
}
template <typename T>
Status ConvertToDecimal128(const Array& array, const std::shared_ptr<DataType>&,
MemoryPool* pool, std::shared_ptr<Array>*) {
return Status::NotImplemented("not implemented");
}
template <>
Status ConvertToDecimal128<FLBAType>(const Array& array,
const std::shared_ptr<DataType>& type,
MemoryPool* pool, std::shared_ptr<Array>* out) {
const auto& fixed_size_binary_array =
static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
// The byte width of each decimal value
const int32_t type_length =
static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
// number of elements in the entire array
const int64_t length = fixed_size_binary_array.length();
// Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
// this will be different from the decimal array width because we write the minimum
// number of bytes necessary to represent a given precision
const int32_t byte_width =
static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
.byte_width();
// allocate memory for the decimal array
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
// raw bytes that we can write to
uint8_t* out_ptr = data->mutable_data();
// convert each FixedSizeBinary value to valid decimal bytes
const int64_t null_count = fixed_size_binary_array.null_count();
if (null_count > 0) {
for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
if (!fixed_size_binary_array.IsNull(i)) {
RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
}
}
} else {
for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
}
}
*out = std::make_shared<::arrow::Decimal128Array>(
type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
return Status::OK();
}
template <>
Status ConvertToDecimal128<ByteArrayType>(const Array& array,
const std::shared_ptr<DataType>& type,
MemoryPool* pool, std::shared_ptr<Array>* out) {
const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
const int64_t length = binary_array.length();
const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
const int64_t type_length = decimal_type.byte_width();
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
// raw bytes that we can write to
uint8_t* out_ptr = data->mutable_data();
const int64_t null_count = binary_array.null_count();
// convert each BinaryArray value to valid decimal bytes
for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
int32_t record_len = 0;
const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
if ((record_len < 0) || (record_len > type_length)) {
return Status::Invalid("Invalid BYTE_ARRAY size");
}
auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
out_ptr_view[0] = 0;
out_ptr_view[1] = 0;
// only convert rows that are not null if there are nulls, or
// all rows, if there are not
if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) {
RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
}
}
*out = std::make_shared<::arrow::Decimal128Array>(
type, length, data, binary_array.null_bitmap(), null_count);
return Status::OK();
}
/// \brief Convert an Int32 or Int64 array into a Decimal128Array
/// The parquet spec allows systems to write decimals in int32, int64 if the values are
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
template <typename ParquetIntegerType,
typename = typename std::enable_if<
std::is_same<ParquetIntegerType, Int32Type>::value ||
std::is_same<ParquetIntegerType, Int64Type>::value>::type>
static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
const int64_t length = reader->values_written();
using ElementType = typename ParquetIntegerType::c_type;
static_assert(std::is_same<ElementType, int32_t>::value ||
std::is_same<ElementType, int64_t>::value,
"ElementType must be int32_t or int64_t");
const auto values = reinterpret_cast<const ElementType*>(reader->values());
const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
const int64_t type_length = decimal_type.byte_width();
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
uint8_t* out_ptr = data->mutable_data();
using ::arrow::BitUtil::FromLittleEndian;
for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
// sign/zero extend int32_t values, otherwise a no-op
const auto value = static_cast<int64_t>(values[i]);
auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
// No-op on little endian machines, byteswap on big endian
out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
// no need to byteswap here because we're sign/zero extending exactly 8 bytes
out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
}
if (reader->nullable_values()) {
std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid,
reader->null_count());
} else {
*out = std::make_shared<::arrow::Decimal128Array>(type, length, data);
}
return Status::OK();
}
/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
/// We do this by:
/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
/// 2. Allocating a buffer for the arrow::Decimal128Array
/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
/// representing the high and low bits of each decimal value.
template <typename ParquetType>
Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
DCHECK(binary_reader);
::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
for (size_t i = 0; i < chunks.size(); ++i) {
std::shared_ptr<Array> chunk_as_decimal;
RETURN_NOT_OK(
ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
// Replace the chunk, which will hopefully also free memory as we go
chunks[i] = chunk_as_decimal;
}
*out = std::make_shared<ChunkedArray>(chunks, type);
return Status::OK();
}
Status TransferExtension(RecordReader* reader, std::shared_ptr<DataType> value_type,
const ColumnDescriptor* descr, MemoryPool* pool, Datum* out) {
std::shared_ptr<ChunkedArray> result;
auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(value_type);
auto storage_type = ext_type->storage_type();
RETURN_NOT_OK(TransferColumnData(reader, storage_type, descr, pool, &result));
::arrow::ArrayVector out_chunks(result->num_chunks());
for (int i = 0; i < result->num_chunks(); i++) {
auto chunk = result->chunk(i);
auto ext_data = chunk->data()->Copy();
ext_data->type = ext_type;
auto ext_result = ext_type->MakeArray(ext_data);
out_chunks[i] = ext_result;
}
*out = std::make_shared<ChunkedArray>(out_chunks);
return Status::OK();
}
#define TRANSFER_INT32(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, &result); \
RETURN_NOT_OK(s); \
} break;
#define TRANSFER_INT64(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type, &result); \
RETURN_NOT_OK(s); \
} break;
Status TransferColumnData(internal::RecordReader* reader,
std::shared_ptr<DataType> value_type,
const ColumnDescriptor* descr, MemoryPool* pool,
std::shared_ptr<ChunkedArray>* out) {
Datum result;
std::shared_ptr<ChunkedArray> chunked_result;
switch (value_type->id()) {
case ::arrow::Type::DICTIONARY: {
RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::NA: {
result = std::make_shared<::arrow::NullArray>(reader->values_written());
break;
}
case ::arrow::Type::INT32:
case ::arrow::Type::INT64:
case ::arrow::Type::FLOAT:
case ::arrow::Type::DOUBLE:
result = TransferZeroCopy(reader, value_type);
break;
case ::arrow::Type::BOOL:
RETURN_NOT_OK(TransferBool(reader, pool, &result));
break;
TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
TRANSFER_INT32(INT8, ::arrow::Int8Type);
TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
TRANSFER_INT32(INT16, ::arrow::Int16Type);
TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
TRANSFER_INT32(DATE32, ::arrow::Date32Type);
TRANSFER_INT32(TIME32, ::arrow::Time32Type);
TRANSFER_INT64(TIME64, ::arrow::Time64Type);
case ::arrow::Type::DATE64:
RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
break;
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::BINARY:
case ::arrow::Type::STRING: {
RETURN_NOT_OK(TransferBinary(reader, value_type, &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::DECIMAL: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
RETURN_NOT_OK(
DecimalIntegerTransfer<Int32Type>(reader, pool, value_type, &result));
} break;
case ::parquet::Type::INT64: {
RETURN_NOT_OK(
DecimalIntegerTransfer<Int64Type>(reader, pool, value_type, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
RETURN_NOT_OK(
TransferDecimal<ByteArrayType>(reader, pool, value_type, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
RETURN_NOT_OK(TransferDecimal<FLBAType>(reader, pool, value_type, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal must be int32, int64, byte array, or fixed "
"length binary");
}
} break;
case ::arrow::Type::TIMESTAMP: {
const ::arrow::TimestampType& timestamp_type =
static_cast<::arrow::TimestampType&>(*value_type);
switch (timestamp_type.unit()) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO: {
result = TransferZeroCopy(reader, value_type);
} break;
case ::arrow::TimeUnit::NANO: {
if (descr->physical_type() == ::parquet::Type::INT96) {
RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result));
} else {
result = TransferZeroCopy(reader, value_type);
}
} break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
} break;
case ::arrow::Type::EXTENSION: {
RETURN_NOT_OK(TransferExtension(reader, value_type, descr, pool, &result));
} break;
default:
return Status::NotImplemented("No support for reading columns of type ",
value_type->ToString());
}
DCHECK_NE(result.kind(), Datum::NONE);
if (result.kind() == Datum::ARRAY) {
*out = std::make_shared<ChunkedArray>(result.make_array());
} else if (result.kind() == Datum::CHUNKED_ARRAY) {
*out = result.chunked_array();
} else {
DCHECK(false) << "Should be impossible";
}
return Status::OK();
}
Status ReconstructNestedList(const std::shared_ptr<Array>& arr,
std::shared_ptr<Field> field, int16_t max_def_level,
int16_t max_rep_level, const int16_t* def_levels,
const int16_t* rep_levels, int64_t total_levels,
::arrow::MemoryPool* pool, std::shared_ptr<Array>* out) {
// Walk downwards to extract nullability
std::vector<std::string> item_names;
std::vector<bool> nullable;
std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
nullable.push_back(field->nullable());
while (field->type()->num_children() > 0) {
if (field->type()->num_children() > 1) {
return Status::NotImplemented("Fields with more than one child are not supported.");
} else {
if (field->type()->id() != ::arrow::Type::LIST) {
return Status::NotImplemented("Currently only nesting with Lists is supported.");
}
field = field->type()->child(0);
}
item_names.push_back(field->name());
offset_builders.emplace_back(
std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool));
valid_bits_builders.emplace_back(
std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool));
nullable.push_back(field->nullable());
}
int64_t list_depth = offset_builders.size();
// This describes the minimal definition that describes a level that
// reflects a value in the primitive values array.
int16_t values_def_level = max_def_level;
if (nullable[nullable.size() - 1]) {
values_def_level--;
}
// The definition levels that are needed so that a list is declared
// as empty and not null.
std::vector<int16_t> empty_def_level(list_depth);
int def_level = 0;
for (int i = 0; i < list_depth; i++) {
if (nullable[i]) {
def_level++;
}
empty_def_level[i] = static_cast<int16_t>(def_level);
def_level++;
}
int32_t values_offset = 0;
std::vector<int64_t> null_counts(list_depth, 0);
for (int64_t i = 0; i < total_levels; i++) {
int16_t rep_level = rep_levels[i];
if (rep_level < max_rep_level) {
for (int64_t j = rep_level; j < list_depth; j++) {
if (j == (list_depth - 1)) {
RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
} else {
RETURN_NOT_OK(offset_builders[j]->Append(
static_cast<int32_t>(offset_builders[j + 1]->length())));
}
if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
null_counts[j]++;
break;
} else {
RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
if (empty_def_level[j] == def_levels[i]) {
break;
}
}
}
}
if (def_levels[i] >= values_def_level) {
values_offset++;
}
}
// Add the final offset to all lists
for (int64_t j = 0; j < list_depth; j++) {
if (j == (list_depth - 1)) {
RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
} else {
RETURN_NOT_OK(offset_builders[j]->Append(
static_cast<int32_t>(offset_builders[j + 1]->length())));
}
}
std::vector<std::shared_ptr<Buffer>> offsets;
std::vector<std::shared_ptr<Buffer>> valid_bits;
std::vector<int64_t> list_lengths;
for (int64_t j = 0; j < list_depth; j++) {
list_lengths.push_back(offset_builders[j]->length() - 1);
std::shared_ptr<Array> array;
RETURN_NOT_OK(offset_builders[j]->Finish(&array));
offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
}
*out = arr;
// TODO(wesm): Use passed-in field
for (int64_t j = list_depth - 1; j >= 0; j--) {
auto list_type =
::arrow::list(::arrow::field(item_names[j], (*out)->type(), nullable[j + 1]));
*out = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
*out, valid_bits[j], null_counts[j]);
}
return Status::OK();
}
} // namespace arrow
} // namespace parquet