blob: f77bf38f9e2adeb45c636763f07b576bfe85ac50 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/arrow/schema.h"
#include <algorithm>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "parquet/arrow/writer.h"
#include "parquet/exception.h"
#include "parquet/properties.h"
#include "parquet/schema-internal.h"
#include "parquet/types.h"
using arrow::Field;
using arrow::Status;
using arrow::internal::checked_cast;
using ArrowType = arrow::DataType;
using ArrowTypeId = arrow::Type;
using parquet::Repetition;
using parquet::schema::GroupNode;
using parquet::schema::Node;
using parquet::schema::NodePtr;
using parquet::schema::PrimitiveNode;
using ParquetType = parquet::Type;
using parquet::ConvertedType;
using parquet::LogicalType;
namespace parquet {
namespace arrow {
const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
static Status MakeArrowDecimal(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
*out = ::arrow::decimal(decimal.precision(), decimal.scale());
return Status::OK();
}
static Status MakeArrowInt(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
switch (integer.bit_width()) {
case 8:
*out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
break;
case 16:
*out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
break;
case 32:
*out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
break;
default:
return Status::TypeError(logical_type.ToString(),
" can not annotate physical type Int32");
}
return Status::OK();
}
static Status MakeArrowInt64(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
switch (integer.bit_width()) {
case 64:
*out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
break;
default:
return Status::TypeError(logical_type.ToString(),
" can not annotate physical type Int64");
}
return Status::OK();
}
static Status MakeArrowTime32(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
switch (time.time_unit()) {
case LogicalType::TimeUnit::MILLIS:
*out = ::arrow::time32(::arrow::TimeUnit::MILLI);
break;
default:
return Status::TypeError(logical_type.ToString(),
" can not annotate physical type Time32");
}
return Status::OK();
}
static Status MakeArrowTime64(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
switch (time.time_unit()) {
case LogicalType::TimeUnit::MICROS:
*out = ::arrow::time64(::arrow::TimeUnit::MICRO);
break;
case LogicalType::TimeUnit::NANOS:
*out = ::arrow::time64(::arrow::TimeUnit::NANO);
break;
default:
return Status::TypeError(logical_type.ToString(),
" can not annotate physical type Time64");
}
return Status::OK();
}
static Status MakeArrowTimestamp(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
const bool utc_normalized =
timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
static const char* utc_timezone = "UTC";
switch (timestamp.time_unit()) {
case LogicalType::TimeUnit::MILLIS:
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::MILLI));
break;
case LogicalType::TimeUnit::MICROS:
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::MICRO));
break;
case LogicalType::TimeUnit::NANOS:
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::NANO));
break;
default:
return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
logical_type.ToString());
}
return Status::OK();
}
static Status FromByteArray(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
switch (logical_type.type()) {
case LogicalType::Type::STRING:
*out = ::arrow::utf8();
break;
case LogicalType::Type::DECIMAL:
RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
break;
case LogicalType::Type::NONE:
case LogicalType::Type::ENUM:
case LogicalType::Type::JSON:
case LogicalType::Type::BSON:
*out = ::arrow::binary();
break;
default:
return Status::NotImplemented("Unhandled logical logical_type ",
logical_type.ToString(), " for binary array");
}
return Status::OK();
}
static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length,
std::shared_ptr<ArrowType>* out) {
switch (logical_type.type()) {
case LogicalType::Type::DECIMAL:
RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
break;
case LogicalType::Type::NONE:
case LogicalType::Type::INTERVAL:
case LogicalType::Type::UUID:
*out = ::arrow::fixed_size_binary(physical_length);
break;
default:
return Status::NotImplemented("Unhandled logical logical_type ",
logical_type.ToString(),
" for fixed-length binary array");
}
return Status::OK();
}
static Status FromInt32(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
switch (logical_type.type()) {
case LogicalType::Type::INT:
RETURN_NOT_OK(MakeArrowInt(logical_type, out));
break;
case LogicalType::Type::DATE:
*out = ::arrow::date32();
break;
case LogicalType::Type::TIME:
RETURN_NOT_OK(MakeArrowTime32(logical_type, out));
break;
case LogicalType::Type::DECIMAL:
RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
break;
case LogicalType::Type::NONE:
*out = ::arrow::int32();
break;
default:
return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
" for INT32");
}
return Status::OK();
}
static Status FromInt64(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
switch (logical_type.type()) {
case LogicalType::Type::INT:
RETURN_NOT_OK(MakeArrowInt64(logical_type, out));
break;
case LogicalType::Type::DECIMAL:
RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
break;
case LogicalType::Type::TIMESTAMP:
RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out));
break;
case LogicalType::Type::TIME:
RETURN_NOT_OK(MakeArrowTime64(logical_type, out));
break;
case LogicalType::Type::NONE:
*out = ::arrow::int64();
break;
default:
return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
" for INT64");
}
return Status::OK();
}
Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr<ArrowType>* out) {
const std::shared_ptr<const LogicalType>& logical_type = primitive.logical_type();
if (logical_type->is_invalid() || logical_type->is_null()) {
*out = ::arrow::null();
return Status::OK();
}
switch (primitive.physical_type()) {
case ParquetType::BOOLEAN:
*out = ::arrow::boolean();
break;
case ParquetType::INT32:
RETURN_NOT_OK(FromInt32(*logical_type, out));
break;
case ParquetType::INT64:
RETURN_NOT_OK(FromInt64(*logical_type, out));
break;
case ParquetType::INT96:
*out = TIMESTAMP_NS;
break;
case ParquetType::FLOAT:
*out = ::arrow::float32();
break;
case ParquetType::DOUBLE:
*out = ::arrow::float64();
break;
case ParquetType::BYTE_ARRAY:
RETURN_NOT_OK(FromByteArray(*logical_type, out));
break;
case ParquetType::FIXED_LEN_BYTE_ARRAY:
RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out));
break;
default: {
// PARQUET-1565: This can occur if the file is corrupt
return Status::IOError("Invalid physical column type: ",
TypeToString(primitive.physical_type()));
}
}
return Status::OK();
}
// Forward declaration
Status NodeToFieldInternal(const Node& node,
const std::unordered_set<const Node*>* included_leaf_nodes,
std::shared_ptr<Field>* out);
/*
* Auxilary function to test if a parquet schema node is a leaf node
* that should be included in a resulting arrow schema
*/
inline bool IsIncludedLeaf(const Node& node,
const std::unordered_set<const Node*>* included_leaf_nodes) {
if (included_leaf_nodes == nullptr) {
return true;
}
auto search = included_leaf_nodes->find(&node);
return (search != included_leaf_nodes->end());
}
Status StructFromGroup(const GroupNode& group,
const std::unordered_set<const Node*>* included_leaf_nodes,
std::shared_ptr<ArrowType>* out) {
std::vector<std::shared_ptr<Field>> fields;
std::shared_ptr<Field> field;
*out = nullptr;
for (int i = 0; i < group.field_count(); i++) {
RETURN_NOT_OK(NodeToFieldInternal(*group.field(i), included_leaf_nodes, &field));
if (field != nullptr) {
fields.push_back(field);
}
}
if (fields.size() > 0) {
*out = std::make_shared<::arrow::StructType>(fields);
}
return Status::OK();
}
Status NodeToList(const GroupNode& group,
const std::unordered_set<const Node*>* included_leaf_nodes,
std::shared_ptr<ArrowType>* out) {
*out = nullptr;
if (group.field_count() == 1) {
// This attempts to resolve the preferred 3-level list encoding.
const Node& list_node = *group.field(0);
if (list_node.is_group() && list_node.is_repeated()) {
const auto& list_group = static_cast<const GroupNode&>(list_node);
// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
if (list_group.field_count() == 1 && !schema::HasStructListName(list_group)) {
// List of primitive type
std::shared_ptr<Field> item_field;
RETURN_NOT_OK(
NodeToFieldInternal(*list_group.field(0), included_leaf_nodes, &item_field));
if (item_field != nullptr) {
*out = ::arrow::list(item_field);
}
} else {
// List of struct
std::shared_ptr<ArrowType> inner_type;
RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type));
if (inner_type != nullptr) {
auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
*out = ::arrow::list(item_field);
}
}
} else if (list_node.is_repeated()) {
// repeated primitive node
std::shared_ptr<ArrowType> inner_type;
if (IsIncludedLeaf(static_cast<const Node&>(list_node), included_leaf_nodes)) {
RETURN_NOT_OK(
FromPrimitive(static_cast<const PrimitiveNode&>(list_node), &inner_type));
auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
*out = ::arrow::list(item_field);
}
} else {
return Status::NotImplemented(
"Non-repeated groups in a LIST-annotated group are not supported.");
}
} else {
return Status::NotImplemented(
"Only LIST-annotated groups with a single child can be handled.");
}
return Status::OK();
}
Status NodeToField(const Node& node, std::shared_ptr<Field>* out) {
return NodeToFieldInternal(node, nullptr, out);
}
Status NodeToFieldInternal(const Node& node,
const std::unordered_set<const Node*>* included_leaf_nodes,
std::shared_ptr<Field>* out) {
std::shared_ptr<ArrowType> type = nullptr;
bool nullable = !node.is_required();
*out = nullptr;
if (node.is_repeated()) {
// 1-level LIST encoding fields are required
std::shared_ptr<ArrowType> inner_type;
if (node.is_group()) {
RETURN_NOT_OK(StructFromGroup(static_cast<const GroupNode&>(node),
included_leaf_nodes, &inner_type));
} else if (IsIncludedLeaf(node, included_leaf_nodes)) {
RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &inner_type));
}
if (inner_type != nullptr) {
auto item_field = std::make_shared<Field>(node.name(), inner_type, false);
type = ::arrow::list(item_field);
nullable = false;
}
} else if (node.is_group()) {
const auto& group = static_cast<const GroupNode&>(node);
if (node.logical_type()->is_list()) {
RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type));
} else {
RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type));
}
} else {
// Primitive (leaf) node
if (IsIncludedLeaf(node, included_leaf_nodes)) {
RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &type));
}
}
if (type != nullptr) {
*out = std::make_shared<Field>(node.name(), type, nullable);
}
return Status::OK();
}
Status FromParquetSchema(
const SchemaDescriptor* parquet_schema,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
std::shared_ptr<::arrow::Schema>* out) {
const GroupNode& schema_node = *parquet_schema->group_node();
int num_fields = static_cast<int>(schema_node.field_count());
std::vector<std::shared_ptr<Field>> fields(num_fields);
for (int i = 0; i < num_fields; i++) {
RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i]));
}
*out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
return Status::OK();
}
Status FromParquetSchema(
const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
std::shared_ptr<::arrow::Schema>* out) {
// TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
// from the root Parquet node
// Put the right leaf nodes in an unordered set
// Index in column_indices should be unique, duplicate indices are merged into one and
// ordering by its first appearing.
int num_columns = static_cast<int>(column_indices.size());
std::unordered_set<const Node*> top_nodes; // to deduplicate the top nodes
std::vector<const Node*> base_nodes; // to keep the ordering
std::unordered_set<const Node*> included_leaf_nodes(num_columns);
for (int i = 0; i < num_columns; i++) {
const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]);
included_leaf_nodes.insert(column_desc->schema_node().get());
const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]);
auto insertion = top_nodes.insert(column_root);
if (insertion.second) {
base_nodes.push_back(column_root);
}
}
std::vector<std::shared_ptr<Field>> fields;
std::shared_ptr<Field> field;
for (auto node : base_nodes) {
RETURN_NOT_OK(NodeToFieldInternal(*node, &included_leaf_nodes, &field));
if (field != nullptr) {
fields.push_back(field);
}
}
*out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
return Status::OK();
}
Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Schema>* out) {
return FromParquetSchema(parquet_schema, column_indices, nullptr, out);
}
Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
std::shared_ptr<::arrow::Schema>* out) {
return FromParquetSchema(parquet_schema, nullptr, out);
}
Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
bool nullable, const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties, NodePtr* out) {
Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
NodePtr element;
RETURN_NOT_OK(FieldToNode(type->value_field(), properties, arrow_properties, &element));
NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element});
*out = GroupNode::Make(name, repetition, {list}, LogicalType::List());
return Status::OK();
}
Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
const std::string& name, bool nullable,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties, NodePtr* out) {
Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
std::vector<NodePtr> children(type->num_children());
for (int i = 0; i < type->num_children(); i++) {
RETURN_NOT_OK(
FieldToNode(type->child(i), properties, arrow_properties, &children[i]));
}
*out = GroupNode::Make(name, repetition, children);
return Status::OK();
}
static std::shared_ptr<const LogicalType> TimestampLogicalTypeFromArrowTimestamp(
const ::arrow::TimestampType& timestamp_type, ::arrow::TimeUnit::type time_unit) {
const bool utc = !(timestamp_type.timezone().empty());
// ARROW-5878(wesm): for forward compatibility reasons, and because
// there's no other way to signal to old readers that values are
// timestamps, we force the ConvertedType field to be set to the
// corresponding TIMESTAMP_* value. This does cause some ambiguity
// as Parquet readers have not been consistent about the
// interpretation of TIMESTAMP_* values as being UTC-normalized.
switch (time_unit) {
case ::arrow::TimeUnit::MILLI:
return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true);
case ::arrow::TimeUnit::MICRO:
return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true);
case ::arrow::TimeUnit::NANO:
return LogicalType::Timestamp(utc, LogicalType::TimeUnit::NANOS);
case ::arrow::TimeUnit::SECOND:
// No equivalent parquet logical type.
break;
}
return LogicalType::None();
}
static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties,
ParquetType::type* physical_type,
std::shared_ptr<const LogicalType>* logical_type) {
const bool coerce = arrow_properties.coerce_timestamps_enabled();
const auto target_unit =
coerce ? arrow_properties.coerce_timestamps_unit() : type.unit();
// The user is explicitly asking for Impala int96 encoding, there is no
// logical type.
if (arrow_properties.support_deprecated_int96_timestamps()) {
*physical_type = ParquetType::INT96;
return Status::OK();
}
*physical_type = ParquetType::INT64;
*logical_type = TimestampLogicalTypeFromArrowTimestamp(type, target_unit);
// The user is explicitly asking for timestamp data to be converted to the
// specified units (target_unit).
if (coerce) {
if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
switch (target_unit) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
break;
case ::arrow::TimeUnit::NANO:
case ::arrow::TimeUnit::SECOND:
return Status::NotImplemented(
"For Parquet version 1.0 files, can only coerce Arrow timestamps to "
"milliseconds or microseconds");
}
} else {
switch (target_unit) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
case ::arrow::TimeUnit::NANO:
break;
case ::arrow::TimeUnit::SECOND:
return Status::NotImplemented(
"For Parquet files, can only coerce Arrow timestamps to milliseconds, "
"microseconds, or nanoseconds");
}
}
return Status::OK();
}
// The user implicitly wants timestamp data to retain its original time units,
// however the ConvertedType field used to indicate logical types for Parquet
// version 1.0 fields does not allow for nanosecond time units and so nanoseconds
// must be coerced to microseconds.
if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0 &&
type.unit() == ::arrow::TimeUnit::NANO) {
*logical_type =
TimestampLogicalTypeFromArrowTimestamp(type, ::arrow::TimeUnit::MICRO);
return Status::OK();
}
// The user implicitly wants timestamp data to retain its original time units,
// however the Arrow seconds time unit can not be represented (annotated) in
// any version of Parquet and so must be coerced to milliseconds.
if (type.unit() == ::arrow::TimeUnit::SECOND) {
*logical_type =
TimestampLogicalTypeFromArrowTimestamp(type, ::arrow::TimeUnit::MILLI);
return Status::OK();
}
return Status::OK();
}
Status FieldToNode(const std::shared_ptr<Field>& field,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties, NodePtr* out) {
std::shared_ptr<const LogicalType> logical_type = LogicalType::None();
ParquetType::type type;
Repetition::type repetition =
field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED;
int length = -1;
int precision = -1;
int scale = -1;
switch (field->type()->id()) {
case ArrowTypeId::NA:
type = ParquetType::INT32;
logical_type = LogicalType::Null();
break;
case ArrowTypeId::BOOL:
type = ParquetType::BOOLEAN;
break;
case ArrowTypeId::UINT8:
type = ParquetType::INT32;
logical_type = LogicalType::Int(8, false);
break;
case ArrowTypeId::INT8:
type = ParquetType::INT32;
logical_type = LogicalType::Int(8, true);
break;
case ArrowTypeId::UINT16:
type = ParquetType::INT32;
logical_type = LogicalType::Int(16, false);
break;
case ArrowTypeId::INT16:
type = ParquetType::INT32;
logical_type = LogicalType::Int(16, true);
break;
case ArrowTypeId::UINT32:
if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
type = ParquetType::INT64;
} else {
type = ParquetType::INT32;
logical_type = LogicalType::Int(32, false);
}
break;
case ArrowTypeId::INT32:
type = ParquetType::INT32;
break;
case ArrowTypeId::UINT64:
type = ParquetType::INT64;
logical_type = LogicalType::Int(64, false);
break;
case ArrowTypeId::INT64:
type = ParquetType::INT64;
break;
case ArrowTypeId::FLOAT:
type = ParquetType::FLOAT;
break;
case ArrowTypeId::DOUBLE:
type = ParquetType::DOUBLE;
break;
case ArrowTypeId::STRING:
type = ParquetType::BYTE_ARRAY;
logical_type = LogicalType::String();
break;
case ArrowTypeId::BINARY:
type = ParquetType::BYTE_ARRAY;
break;
case ArrowTypeId::FIXED_SIZE_BINARY: {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
const auto& fixed_size_binary_type =
static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type());
length = fixed_size_binary_type.byte_width();
} break;
case ArrowTypeId::DECIMAL: {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
const auto& decimal_type =
static_cast<const ::arrow::Decimal128Type&>(*field->type());
precision = decimal_type.precision();
scale = decimal_type.scale();
length = DecimalSize(precision);
PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale));
} break;
case ArrowTypeId::DATE32:
type = ParquetType::INT32;
logical_type = LogicalType::Date();
break;
case ArrowTypeId::DATE64:
type = ParquetType::INT32;
logical_type = LogicalType::Date();
break;
case ArrowTypeId::TIMESTAMP:
RETURN_NOT_OK(
GetTimestampMetadata(static_cast<::arrow::TimestampType&>(*field->type()),
properties, arrow_properties, &type, &logical_type));
break;
case ArrowTypeId::TIME32:
type = ParquetType::INT32;
logical_type =
LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::MILLIS);
break;
case ArrowTypeId::TIME64: {
type = ParquetType::INT64;
auto time_type = static_cast<::arrow::Time64Type*>(field->type().get());
if (time_type->unit() == ::arrow::TimeUnit::NANO) {
logical_type =
LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::NANOS);
} else {
logical_type =
LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::MICROS);
}
} break;
case ArrowTypeId::STRUCT: {
auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
return StructToNode(struct_type, field->name(), field->nullable(), properties,
arrow_properties, out);
}
case ArrowTypeId::LIST: {
auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type());
return ListToNode(list_type, field->name(), field->nullable(), properties,
arrow_properties, out);
}
case ArrowTypeId::DICTIONARY: {
// Parquet has no Dictionary type, dictionary-encoded is handled on
// the encoding, not the schema level.
const ::arrow::DictionaryType& dict_type =
static_cast<const ::arrow::DictionaryType&>(*field->type());
std::shared_ptr<::arrow::Field> unpacked_field = ::arrow::field(
field->name(), dict_type.value_type(), field->nullable(), field->metadata());
return FieldToNode(unpacked_field, properties, arrow_properties, out);
}
default: {
// TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR
return Status::NotImplemented(
"Unhandled type for Arrow to Parquet schema conversion: ",
field->type()->ToString());
}
}
PARQUET_CATCH_NOT_OK(*out = PrimitiveNode::Make(field->name(), repetition, logical_type,
type, length));
return Status::OK();
}
Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties,
std::shared_ptr<SchemaDescriptor>* out) {
std::vector<NodePtr> nodes(arrow_schema->num_fields());
for (int i = 0; i < arrow_schema->num_fields(); i++) {
RETURN_NOT_OK(
FieldToNode(arrow_schema->field(i), properties, arrow_properties, &nodes[i]));
}
NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes);
*out = std::make_shared<::parquet::SchemaDescriptor>();
PARQUET_CATCH_NOT_OK((*out)->Init(schema));
return Status::OK();
}
Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
const WriterProperties& properties,
std::shared_ptr<SchemaDescriptor>* out) {
return ToParquetSchema(arrow_schema, properties, *default_arrow_writer_properties(),
out);
}
/// \brief Compute the number of bytes required to represent a decimal of a
/// given precision. Taken from the Apache Impala codebase. The comments next
/// to the return values are the maximum value that can be represented in 2's
/// complement with the returned number of bytes.
int32_t DecimalSize(int32_t precision) {
DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
<< precision;
DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
<< precision;
switch (precision) {
case 1:
case 2:
return 1; // 127
case 3:
case 4:
return 2; // 32,767
case 5:
case 6:
return 3; // 8,388,607
case 7:
case 8:
case 9:
return 4; // 2,147,483,427
case 10:
case 11:
return 5; // 549,755,813,887
case 12:
case 13:
case 14:
return 6; // 140,737,488,355,327
case 15:
case 16:
return 7; // 36,028,797,018,963,967
case 17:
case 18:
return 8; // 9,223,372,036,854,775,807
case 19:
case 20:
case 21:
return 9; // 2,361,183,241,434,822,606,847
case 22:
case 23:
return 10; // 604,462,909,807,314,587,353,087
case 24:
case 25:
case 26:
return 11; // 154,742,504,910,672,534,362,390,527
case 27:
case 28:
return 12; // 39,614,081,257,132,168,796,771,975,167
case 29:
case 30:
case 31:
return 13; // 10,141,204,801,825,835,211,973,625,643,007
case 32:
case 33:
return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
case 34:
case 35:
return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
case 36:
case 37:
case 38:
return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
default:
break;
}
DCHECK(false);
return -1;
}
} // namespace arrow
} // namespace parquet