blob: 6c04b7048b0167f274775a447863ae94519fee7f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Provides API for converting parquet schema to arrow schema and vice versa.
//!
//! The main interfaces for converting parquet schema to arrow schema are
//! `parquet_to_arrow_schema`, `parquet_to_arrow_schema_by_columns` and
//! `parquet_to_arrow_field`.
//!
//! The interfaces for converting arrow schema to parquet schema is coming.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
use arrow::ipc::writer;
use crate::errors::{ParquetError::ArrowError, Result};
use crate::file::{metadata::KeyValue, properties::WriterProperties};
use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr};
use crate::{
basic::{
ConvertedType, DecimalType, IntType, LogicalType, Repetition, TimeType,
TimeUnit as ParquetTimeUnit, TimestampType, Type as PhysicalType,
},
errors::ParquetError,
};
/// Convert Parquet schema to Arrow schema including optional metadata.
/// Attempts to decode any existing Arrow schema metadata, falling back
/// to converting the Parquet schema column-wise
pub fn parquet_to_arrow_schema(
parquet_schema: &SchemaDescriptor,
key_value_metadata: &Option<Vec<KeyValue>>,
) -> Result<Schema> {
let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
metadata
.remove(super::ARROW_SCHEMA_META_KEY)
.map(|encoded| get_arrow_schema_from_metadata(&encoded))
.unwrap_or(parquet_to_arrow_schema_by_columns(
parquet_schema,
0..parquet_schema.columns().len(),
key_value_metadata,
))
}
/// Convert parquet schema to arrow schema including optional metadata,
/// only preserving some root columns.
/// This is useful if we have columns `a.b`, `a.c.e` and `a.d`,
/// and want `a` with all its child fields
pub fn parquet_to_arrow_schema_by_root_columns<T>(
parquet_schema: &SchemaDescriptor,
column_indices: T,
key_value_metadata: &Option<Vec<KeyValue>>,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
{
// Reconstruct the index ranges of the parent columns
// An Arrow struct gets represented by 1+ columns based on how many child fields the
// struct has. This means that getting fields 1 and 2 might return the struct twice,
// if field 1 is the struct having say 3 fields, and field 2 is a primitive.
//
// The below gets the parent columns, and counts the number of child fields in each parent,
// such that we would end up with:
// - field 1 - columns: [0, 1, 2]
// - field 2 - columns: [3]
let mut parent_columns = vec![];
let mut curr_name = "";
let mut prev_name = "";
let mut indices = vec![];
(0..(parquet_schema.num_columns())).for_each(|i| {
let p_type = parquet_schema.get_column_root(i);
curr_name = p_type.get_basic_info().name();
if prev_name.is_empty() {
// first index
indices.push(i);
prev_name = curr_name;
} else if curr_name != prev_name {
prev_name = curr_name;
parent_columns.push((curr_name.to_string(), indices.clone()));
indices = vec![i];
} else {
indices.push(i);
}
});
// push the last column if indices has values
if !indices.is_empty() {
parent_columns.push((curr_name.to_string(), indices));
}
// gather the required leaf columns
let leaf_columns = column_indices
.into_iter()
.flat_map(|i| parent_columns[i].1.clone());
parquet_to_arrow_schema_by_columns(parquet_schema, leaf_columns, key_value_metadata)
}
/// Convert parquet schema to arrow schema including optional metadata,
/// only preserving some leaf columns.
pub fn parquet_to_arrow_schema_by_columns<T>(
parquet_schema: &SchemaDescriptor,
column_indices: T,
key_value_metadata: &Option<Vec<KeyValue>>,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
{
let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
let arrow_schema_metadata = metadata
.remove(super::ARROW_SCHEMA_META_KEY)
.map(|encoded| get_arrow_schema_from_metadata(&encoded))
.map_or(Ok(None), |v| v.map(Some))?;
// add the Arrow metadata to the Parquet metadata
if let Some(arrow_schema) = &arrow_schema_metadata {
arrow_schema.metadata().iter().for_each(|(k, v)| {
metadata.insert(k.clone(), v.clone());
});
}
let mut base_nodes = Vec::new();
let mut base_nodes_set = HashSet::new();
let mut leaves = HashSet::new();
enum FieldType<'a> {
Parquet(&'a Type),
Arrow(Field),
}
for c in column_indices {
let column = parquet_schema.column(c);
let name = column.name();
if let Some(field) = arrow_schema_metadata
.as_ref()
.and_then(|schema| schema.field_with_name(name).ok().cloned())
{
base_nodes.push(FieldType::Arrow(field));
} else {
let column = column.self_type() as *const Type;
let root = parquet_schema.get_column_root(c);
let root_raw_ptr = root as *const Type;
leaves.insert(column);
if !base_nodes_set.contains(&root_raw_ptr) {
base_nodes.push(FieldType::Parquet(root));
base_nodes_set.insert(root_raw_ptr);
}
}
}
base_nodes
.into_iter()
.map(|t| match t {
FieldType::Parquet(t) => ParquetTypeConverter::new(t, &leaves).to_field(),
FieldType::Arrow(f) => Ok(Some(f)),
})
.collect::<Result<Vec<Option<Field>>>>()
.map(|result| result.into_iter().flatten().collect::<Vec<Field>>())
.map(|fields| Schema::new_with_metadata(fields, metadata))
}
/// Try to convert Arrow schema metadata into a schema
fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
let decoded = base64::decode(encoded_meta);
match decoded {
Ok(bytes) => {
let slice = if bytes[0..4] == [255u8; 4] {
&bytes[8..]
} else {
bytes.as_slice()
};
match arrow::ipc::root_as_message(slice) {
Ok(message) => message
.header_as_schema()
.map(arrow::ipc::convert::fb_to_schema)
.ok_or(ArrowError("the message is not Arrow Schema".to_string())),
Err(err) => {
// The flatbuffers implementation returns an error on verification error.
Err(ArrowError(format!(
"Unable to get root as message stored in {}: {:?}",
super::ARROW_SCHEMA_META_KEY,
err
)))
}
}
}
Err(err) => {
// The C++ implementation returns an error if the schema can't be parsed.
Err(ArrowError(format!(
"Unable to decode the encoded schema stored in {}, {:?}",
super::ARROW_SCHEMA_META_KEY,
err
)))
}
}
}
/// Encodes the Arrow schema into the IPC format, and base64 encodes it
fn encode_arrow_schema(schema: &Schema) -> String {
let options = writer::IpcWriteOptions::default();
let data_gen = arrow::ipc::writer::IpcDataGenerator::default();
let mut serialized_schema = data_gen.schema_to_bytes(&schema, &options);
// manually prepending the length to the schema as arrow uses the legacy IPC format
// TODO: change after addressing ARROW-9777
let schema_len = serialized_schema.ipc_message.len();
let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
len_prefix_schema.append(&mut serialized_schema.ipc_message);
base64::encode(&len_prefix_schema)
}
/// Mutates writer metadata by storing the encoded Arrow schema.
/// If there is an existing Arrow schema metadata, it is replaced.
pub(crate) fn add_encoded_arrow_schema_to_metadata(
schema: &Schema,
props: &mut WriterProperties,
) {
let encoded = encode_arrow_schema(schema);
let schema_kv = KeyValue {
key: super::ARROW_SCHEMA_META_KEY.to_string(),
value: Some(encoded),
};
let mut meta = props.key_value_metadata.clone().unwrap_or_default();
// check if ARROW:schema exists, and overwrite it
let schema_meta = meta
.iter()
.enumerate()
.find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
match schema_meta {
Some((i, _)) => {
meta.remove(i);
meta.push(schema_kv);
}
None => {
meta.push(schema_kv);
}
}
props.key_value_metadata = Some(meta);
}
/// Convert arrow schema to parquet schema
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
let fields: Result<Vec<TypePtr>> = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field).map(Arc::new))
.collect();
let group = Type::group_type_builder("arrow_schema")
.with_fields(&mut fields?)
.build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
}
fn parse_key_value_metadata(
key_value_metadata: &Option<Vec<KeyValue>>,
) -> Option<HashMap<String, String>> {
match key_value_metadata {
Some(key_values) => {
let map: HashMap<String, String> = key_values
.iter()
.filter_map(|kv| {
kv.value
.as_ref()
.map(|value| (kv.key.clone(), value.clone()))
})
.collect();
if map.is_empty() {
None
} else {
Some(map)
}
}
None => None,
}
}
/// Convert parquet column schema to arrow field.
pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
let schema = parquet_column.self_type();
let mut leaves = HashSet::new();
leaves.insert(parquet_column.self_type() as *const Type);
ParquetTypeConverter::new(schema, &leaves)
.to_field()
.map(|opt| opt.unwrap())
}
pub fn decimal_length_from_precision(precision: usize) -> usize {
(10.0_f64.powi(precision as i32).log2() / 8.0).ceil() as usize
}
/// Convert an arrow field to a parquet `Type`
fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
let name = field.name().as_str();
let repetition = if field.is_nullable() {
Repetition::OPTIONAL
} else {
Repetition::REQUIRED
};
// create type from field
match field.data_type() {
DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::UNKNOWN(Default::default())))
.with_repetition(repetition)
.build(),
DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
.with_repetition(repetition)
.build(),
DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
bit_width: 8,
is_signed: true,
})))
.with_repetition(repetition)
.build(),
DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
bit_width: 16,
is_signed: true,
})))
.with_repetition(repetition)
.build(),
DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_repetition(repetition)
.build(),
DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_repetition(repetition)
.build(),
DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
bit_width: 8,
is_signed: false,
})))
.with_repetition(repetition)
.build(),
DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
bit_width: 16,
is_signed: false,
})))
.with_repetition(repetition)
.build(),
DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
bit_width: 32,
is_signed: false,
})))
.with_repetition(repetition)
.build(),
DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
bit_width: 64,
is_signed: false,
})))
.with_repetition(repetition)
.build(),
DataType::Float16 => Err(ArrowError("Float16 arrays not supported".to_string())),
DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
.with_repetition(repetition)
.build(),
DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
.with_repetition(repetition)
.build(),
DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder(
name,
PhysicalType::INT64,
)
.with_logical_type(Some(LogicalType::TIMESTAMP(TimestampType {
is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
unit: match time_unit {
TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
},
})))
.with_repetition(repetition)
.build(),
DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::DATE(Default::default())))
.with_repetition(repetition)
.build(),
// date64 is cast to date32
DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::DATE(Default::default())))
.with_repetition(repetition)
.build(),
DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::TIME(TimeType {
is_adjusted_to_u_t_c: false,
unit: ParquetTimeUnit::MILLIS(Default::default()),
})))
.with_repetition(repetition)
.build(),
DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::TIME(TimeType {
is_adjusted_to_u_t_c: false,
unit: match unit {
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
u => unreachable!("Invalid unit for Time64: {:?}", u),
},
})))
.with_repetition(repetition)
.build(),
DataType::Duration(_) => Err(ArrowError(
"Converting Duration to parquet not supported".to_string(),
)),
DataType::Interval(_) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_converted_type(ConvertedType::INTERVAL)
.with_repetition(repetition)
.with_length(12)
.build()
}
DataType::Binary | DataType::LargeBinary => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_repetition(repetition)
.build()
}
DataType::FixedSizeBinary(length) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
.with_length(*length)
.build()
}
DataType::Decimal(precision, scale) => {
// Decimal precision determines the Parquet physical type to use.
// TODO(ARROW-12018): Enable the below after ARROW-10818 Decimal support
//
// let (physical_type, length) = if *precision > 1 && *precision <= 9 {
// (PhysicalType::INT32, -1)
// } else if *precision <= 18 {
// (PhysicalType::INT64, -1)
// } else {
// (
// PhysicalType::FIXED_LEN_BYTE_ARRAY,
// decimal_length_from_precision(*precision) as i32,
// )
// };
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
.with_length(decimal_length_from_precision(*precision) as i32)
.with_logical_type(Some(LogicalType::DECIMAL(DecimalType {
scale: *scale as i32,
precision: *precision as i32,
})))
.with_precision(*precision as i32)
.with_scale(*scale as i32)
.build()
}
DataType::Utf8 | DataType::LargeUtf8 => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type(Some(LogicalType::STRING(Default::default())))
.with_repetition(repetition)
.build()
}
DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
Type::group_type_builder(name)
.with_fields(&mut vec![Arc::new(
Type::group_type_builder("list")
.with_fields(&mut vec![Arc::new(arrow_to_parquet_type(f)?)])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
.with_logical_type(Some(LogicalType::LIST(Default::default())))
.with_repetition(repetition)
.build()
}
DataType::Struct(fields) => {
if fields.is_empty() {
return Err(ArrowError(
"Parquet does not support writing empty structs".to_string(),
));
}
// recursively convert children to types/nodes
let fields: Result<Vec<TypePtr>> = fields
.iter()
.map(|f| arrow_to_parquet_type(f).map(Arc::new))
.collect();
Type::group_type_builder(name)
.with_fields(&mut fields?)
.with_repetition(repetition)
.build()
}
DataType::Union(_) => unimplemented!("See ARROW-8817."),
DataType::Dictionary(_, ref value) => {
// Dictionary encoding not handled at the schema level
let dict_field = Field::new(name, *value.clone(), field.is_nullable());
arrow_to_parquet_type(&dict_field)
}
}
}
/// This struct is used to group methods and data structures used to convert parquet
/// schema together.
struct ParquetTypeConverter<'a> {
schema: &'a Type,
/// This is the columns that need to be converted to arrow schema.
columns_to_convert: &'a HashSet<*const Type>,
}
impl<'a> ParquetTypeConverter<'a> {
fn new(schema: &'a Type, columns_to_convert: &'a HashSet<*const Type>) -> Self {
Self {
schema,
columns_to_convert,
}
}
fn clone_with_schema(&self, other: &'a Type) -> Self {
Self {
schema: other,
columns_to_convert: self.columns_to_convert,
}
}
}
impl ParquetTypeConverter<'_> {
// Public interfaces.
/// Converts parquet schema to arrow data type.
///
/// This function discards schema name.
///
/// If this schema is a primitive type and not included in the leaves, the result is
/// Ok(None).
///
/// If this schema is a group type and none of its children is reserved in the
/// conversion, the result is Ok(None).
fn to_data_type(&self) -> Result<Option<DataType>> {
match self.schema {
Type::PrimitiveType { .. } => self.to_primitive_type(),
Type::GroupType { .. } => self.to_group_type(),
}
}
/// Converts parquet schema to arrow field.
///
/// This method is roughly the same as
/// [`to_data_type`](`ParquetTypeConverter::to_data_type`), except it reserves schema
/// name.
fn to_field(&self) -> Result<Option<Field>> {
self.to_data_type().map(|opt| {
opt.map(|dt| Field::new(self.schema.name(), dt, self.is_nullable()))
})
}
// Utility functions.
/// Checks whether this schema is nullable.
fn is_nullable(&self) -> bool {
let basic_info = self.schema.get_basic_info();
if basic_info.has_repetition() {
match basic_info.repetition() {
Repetition::OPTIONAL => true,
Repetition::REPEATED => true,
Repetition::REQUIRED => false,
}
} else {
false
}
}
fn is_repeated(&self) -> bool {
let basic_info = self.schema.get_basic_info();
basic_info.has_repetition() && basic_info.repetition() == Repetition::REPEATED
}
fn is_self_included(&self) -> bool {
self.columns_to_convert
.contains(&(self.schema as *const Type))
}
// Functions for primitive types.
/// Entry point for converting parquet primitive type to arrow type.
///
/// This function takes care of repetition.
fn to_primitive_type(&self) -> Result<Option<DataType>> {
if self.is_self_included() {
self.to_primitive_type_inner().map(|dt| {
if self.is_repeated() {
Some(DataType::List(Box::new(Field::new(
self.schema.name(),
dt,
self.is_nullable(),
))))
} else {
Some(dt)
}
})
} else {
Ok(None)
}
}
/// Converting parquet primitive type to arrow data type.
fn to_primitive_type_inner(&self) -> Result<DataType> {
match self.schema.get_physical_type() {
PhysicalType::BOOLEAN => Ok(DataType::Boolean),
PhysicalType::INT32 => self.from_int32(),
PhysicalType::INT64 => self.from_int64(),
PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
PhysicalType::FLOAT => Ok(DataType::Float32),
PhysicalType::DOUBLE => Ok(DataType::Float64),
PhysicalType::BYTE_ARRAY => self.from_byte_array(),
PhysicalType::FIXED_LEN_BYTE_ARRAY => self.from_fixed_len_byte_array(),
}
}
fn from_int32(&self) -> Result<DataType> {
match (
self.schema.get_basic_info().logical_type(),
self.schema.get_basic_info().converted_type(),
) {
(None, ConvertedType::NONE) => Ok(DataType::Int32),
(Some(LogicalType::INTEGER(t)), _) => match (t.bit_width, t.is_signed) {
(8, true) => Ok(DataType::Int8),
(16, true) => Ok(DataType::Int16),
(32, true) => Ok(DataType::Int32),
(8, false) => Ok(DataType::UInt8),
(16, false) => Ok(DataType::UInt16),
(32, false) => Ok(DataType::UInt32),
_ => Err(ArrowError(format!(
"Cannot create INT32 physical type from {:?}",
t
))),
},
(Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()),
(Some(LogicalType::DATE(_)), _) => Ok(DataType::Date32),
(Some(LogicalType::TIME(t)), _) => match t.unit {
ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
_ => Err(ArrowError(format!(
"Cannot create INT32 physical type from {:?}",
t.unit
))),
},
(None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
(None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
(None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
(None, ConvertedType::INT_8) => Ok(DataType::Int8),
(None, ConvertedType::INT_16) => Ok(DataType::Int16),
(None, ConvertedType::INT_32) => Ok(DataType::Int32),
(None, ConvertedType::DATE) => Ok(DataType::Date32),
(None, ConvertedType::TIME_MILLIS) => {
Ok(DataType::Time32(TimeUnit::Millisecond))
}
(None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
(logical, converted) => Err(ArrowError(format!(
"Unable to convert parquet INT32 logical type {:?} or converted type {}",
logical, converted
))),
}
}
fn from_int64(&self) -> Result<DataType> {
match (
self.schema.get_basic_info().logical_type(),
self.schema.get_basic_info().converted_type(),
) {
(None, ConvertedType::NONE) => Ok(DataType::Int64),
(Some(LogicalType::INTEGER(t)), _) if t.bit_width == 64 => {
match t.is_signed {
true => Ok(DataType::Int64),
false => Ok(DataType::UInt64),
}
}
(Some(LogicalType::TIME(t)), _) => match t.unit {
ParquetTimeUnit::MILLIS(_) => Err(ArrowError(
"Cannot create INT64 from MILLIS time unit".to_string(),
)),
ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
},
(Some(LogicalType::TIMESTAMP(t)), _) => Ok(DataType::Timestamp(
match t.unit {
ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
},
if t.is_adjusted_to_u_t_c {
Some("UTC".to_string())
} else {
None
},
)),
(None, ConvertedType::INT_64) => Ok(DataType::Int64),
(None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
(None, ConvertedType::TIME_MICROS) => {
Ok(DataType::Time64(TimeUnit::Microsecond))
}
(None, ConvertedType::TIMESTAMP_MILLIS) => {
Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
}
(None, ConvertedType::TIMESTAMP_MICROS) => {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}
(Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()),
(None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
(logical, converted) => Err(ArrowError(format!(
"Unable to convert parquet INT64 logical type {:?} or converted type {}",
logical, converted
))),
}
}
fn from_fixed_len_byte_array(&self) -> Result<DataType> {
match (
self.schema.get_basic_info().logical_type(),
self.schema.get_basic_info().converted_type(),
) {
(Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()),
(None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
(None, ConvertedType::INTERVAL) => {
// There is currently no reliable way of determining which IntervalUnit
// to return. Thus without the original Arrow schema, the results
// would be incorrect if all 12 bytes of the interval are populated
Ok(DataType::Interval(IntervalUnit::DayTime))
}
_ => {
let byte_width = match self.schema {
Type::PrimitiveType {
ref type_length, ..
} => *type_length,
_ => {
return Err(ArrowError(
"Expected a physical type, not a group type".to_string(),
))
}
};
Ok(DataType::FixedSizeBinary(byte_width))
}
}
}
fn to_decimal(&self) -> DataType {
assert!(self.schema.is_primitive());
DataType::Decimal(
self.schema.get_precision() as usize,
self.schema.get_scale() as usize,
)
}
fn from_byte_array(&self) -> Result<DataType> {
match (self.schema.get_basic_info().logical_type(), self.schema.get_basic_info().converted_type()) {
(Some(LogicalType::STRING(_)), _) => Ok(DataType::Utf8),
(Some(LogicalType::JSON(_)), _) => Ok(DataType::Binary),
(Some(LogicalType::BSON(_)), _) => Ok(DataType::Binary),
(Some(LogicalType::ENUM(_)), _) => Ok(DataType::Binary),
(None, ConvertedType::NONE) => Ok(DataType::Binary),
(None, ConvertedType::JSON) => Ok(DataType::Binary),
(None, ConvertedType::BSON) => Ok(DataType::Binary),
(None, ConvertedType::ENUM) => Ok(DataType::Binary),
(None, ConvertedType::UTF8) => Ok(DataType::Utf8),
(logical, converted) => Err(ArrowError(format!(
"Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
logical, converted
))),
}
}
// Functions for group types.
/// Entry point for converting parquet group type.
///
/// This function takes care of logical type and repetition.
fn to_group_type(&self) -> Result<Option<DataType>> {
if self.is_repeated() {
self.to_struct().map(|opt| {
opt.map(|dt| {
DataType::List(Box::new(Field::new(
self.schema.name(),
dt,
self.is_nullable(),
)))
})
})
} else {
match (
self.schema.get_basic_info().logical_type(),
self.schema.get_basic_info().converted_type(),
) {
(Some(LogicalType::LIST(_)), _) => self.to_list(),
(None, ConvertedType::LIST) => self.to_list(),
_ => self.to_struct(),
}
}
}
/// Converts a parquet group type to arrow struct.
fn to_struct(&self) -> Result<Option<DataType>> {
match self.schema {
Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
"{:?} is a struct type, and can't be processed as primitive.",
self.schema
))),
Type::GroupType {
basic_info: _,
fields,
} => fields
.iter()
.map(|field_ptr| self.clone_with_schema(field_ptr).to_field())
.collect::<Result<Vec<Option<Field>>>>()
.map(|result| result.into_iter().flatten().collect::<Vec<Field>>())
.map(|fields| {
if fields.is_empty() {
None
} else {
Some(DataType::Struct(fields))
}
}),
}
}
/// Converts a parquet list to arrow list.
///
/// To fully understand this algorithm, please refer to
/// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
fn to_list(&self) -> Result<Option<DataType>> {
match self.schema {
Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
"{:?} is a list type and can't be processed as primitive.",
self.schema
))),
Type::GroupType {
basic_info: _,
fields,
} if fields.len() == 1 => {
let list_item = fields.first().unwrap();
let item_converter = self.clone_with_schema(list_item);
let item_type = match list_item.as_ref() {
Type::PrimitiveType { .. } => {
if item_converter.is_repeated() {
item_converter.to_primitive_type_inner().map(Some)
} else {
Err(ArrowError(
"Primitive element type of list must be repeated."
.to_string(),
))
}
}
Type::GroupType {
basic_info: _,
fields,
} => {
if fields.len() > 1 {
item_converter.to_struct()
} else if fields.len() == 1
&& list_item.name() != "array"
&& list_item.name() != format!("{}_tuple", self.schema.name())
{
let nested_item = fields.first().unwrap();
let nested_item_converter =
self.clone_with_schema(nested_item);
nested_item_converter.to_data_type()
} else {
item_converter.to_struct()
}
}
};
// Check that the name of the list child is "list", in which case we
// get the child nullability and name (normally "element") from the nested
// group type.
// Without this step, the child incorrectly inherits the parent's optionality
let (list_item_name, item_is_optional) = match &item_converter.schema {
Type::GroupType { basic_info, fields }
if basic_info.name() == "list" && fields.len() == 1 =>
{
let field = fields.first().unwrap();
(field.name(), field.is_optional())
}
_ => (list_item.name(), list_item.is_optional()),
};
item_type.map(|opt| {
opt.map(|dt| {
DataType::List(Box::new(Field::new(
list_item_name,
dt,
item_is_optional,
)))
})
})
}
_ => Err(ArrowError(
"Group element type of list can only contain one field.".to_string(),
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{collections::HashMap, convert::TryFrom, sync::Arc};
use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
use crate::{
arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
schema::{parser::parse_message_type, types::SchemaDescriptor},
util::test_common::get_temp_file,
};
#[test]
fn test_flat_primitives() {
let message_type = "
message test_schema {
REQUIRED BOOLEAN boolean;
REQUIRED INT32 int8 (INT_8);
REQUIRED INT32 int16 (INT_16);
REQUIRED INT32 uint8 (INTEGER(8,false));
REQUIRED INT32 uint16 (INTEGER(16,false));
REQUIRED INT32 int32;
REQUIRED INT64 int64 ;
OPTIONAL DOUBLE double;
OPTIONAL FLOAT float;
OPTIONAL BINARY string (UTF8);
OPTIONAL BINARY string_2 (STRING);
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
let arrow_fields = vec![
Field::new("boolean", DataType::Boolean, false),
Field::new("int8", DataType::Int8, false),
Field::new("int16", DataType::Int16, false),
Field::new("uint8", DataType::UInt8, false),
Field::new("uint16", DataType::UInt16, false),
Field::new("int32", DataType::Int32, false),
Field::new("int64", DataType::Int64, false),
Field::new("double", DataType::Float64, true),
Field::new("float", DataType::Float32, true),
Field::new("string", DataType::Utf8, true),
Field::new("string_2", DataType::Utf8, true),
];
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}
#[test]
fn test_byte_array_fields() {
let message_type = "
message test_schema {
REQUIRED BYTE_ARRAY binary;
REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
let arrow_fields = vec![
Field::new("binary", DataType::Binary, false),
Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
];
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}
#[test]
fn test_duplicate_fields() {
let message_type = "
message test_schema {
REQUIRED BOOLEAN boolean;
REQUIRED INT32 int8 (INT_8);
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
let arrow_fields = vec![
Field::new("boolean", DataType::Boolean, false),
Field::new("int8", DataType::Int8, false),
];
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
let converted_arrow_schema = parquet_to_arrow_schema_by_columns(
&parquet_schema,
vec![0usize, 1usize],
&None,
)
.unwrap();
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}
#[test]
fn test_parquet_lists() {
let mut arrow_fields = Vec::new();
// LIST encoding example taken from parquet-format/LogicalTypes.md
let message_type = "
message test_schema {
REQUIRED GROUP my_list (LIST) {
REPEATED GROUP list {
OPTIONAL BINARY element (UTF8);
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP list {
REQUIRED BINARY element (UTF8);
}
}
OPTIONAL GROUP array_of_arrays (LIST) {
REPEATED GROUP list {
REQUIRED GROUP element (LIST) {
REPEATED GROUP list {
REQUIRED INT32 element;
}
}
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP element {
REQUIRED BINARY str (UTF8);
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED INT32 element;
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP element {
REQUIRED BINARY str (UTF8);
REQUIRED INT32 num;
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP array {
REQUIRED BINARY str (UTF8);
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP my_list_tuple {
REQUIRED BINARY str (UTF8);
}
}
REPEATED INT32 name;
}
";
// // List<String> (list non-null, elements nullable)
// required group my_list (LIST) {
// repeated group list {
// optional binary element (UTF8);
// }
// }
{
arrow_fields.push(Field::new(
"my_list",
DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
false,
));
}
// // List<String> (list nullable, elements non-null)
// optional group my_list (LIST) {
// repeated group list {
// required binary element (UTF8);
// }
// }
{
arrow_fields.push(Field::new(
"my_list",
DataType::List(Box::new(Field::new("element", DataType::Utf8, false))),
true,
));
}
// Element types can be nested structures. For example, a list of lists:
//
// // List<List<Integer>>
// optional group array_of_arrays (LIST) {
// repeated group list {
// required group element (LIST) {
// repeated group list {
// required int32 element;
// }
// }
// }
// }
{
let arrow_inner_list =
DataType::List(Box::new(Field::new("element", DataType::Int32, false)));
arrow_fields.push(Field::new(
"array_of_arrays",
DataType::List(Box::new(Field::new("element", arrow_inner_list, false))),
true,
));
}
// // List<String> (list nullable, elements non-null)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
// };
// }
{
arrow_fields.push(Field::new(
"my_list",
DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
true,
));
}
// // List<Integer> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated int32 element;
// }
{
arrow_fields.push(Field::new(
"my_list",
DataType::List(Box::new(Field::new("element", DataType::Int32, true))),
true,
));
}
// // List<Tuple<String, Integer>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
// required int32 num;
// };
// }
{
let arrow_struct = DataType::Struct(vec![
Field::new("str", DataType::Utf8, false),
Field::new("num", DataType::Int32, false),
]);
arrow_fields.push(Field::new(
"my_list",
DataType::List(Box::new(Field::new("element", arrow_struct, true))),
true,
));
}
// // List<OneTuple<String>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group array {
// required binary str (UTF8);
// };
// }
// Special case: group is named array
{
let arrow_struct =
DataType::Struct(vec![Field::new("str", DataType::Utf8, false)]);
arrow_fields.push(Field::new(
"my_list",
DataType::List(Box::new(Field::new("array", arrow_struct, true))),
true,
));
}
// // List<OneTuple<String>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group my_list_tuple {
// required binary str (UTF8);
// };
// }
// Special case: group named ends in _tuple
{
let arrow_struct =
DataType::Struct(vec![Field::new("str", DataType::Utf8, false)]);
arrow_fields.push(Field::new(
"my_list",
DataType::List(Box::new(Field::new("my_list_tuple", arrow_struct, true))),
true,
));
}
// One-level encoding: Only allows required lists with required cells
// repeated value_type name
{
arrow_fields.push(Field::new(
"name",
DataType::List(Box::new(Field::new("name", DataType::Int32, true))),
true,
));
}
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
for i in 0..arrow_fields.len() {
assert_eq!(arrow_fields[i], converted_fields[i]);
}
}
#[test]
fn test_parquet_list_nullable() {
let mut arrow_fields = Vec::new();
let message_type = "
message test_schema {
REQUIRED GROUP my_list1 (LIST) {
REPEATED GROUP list {
OPTIONAL BINARY element (UTF8);
}
}
OPTIONAL GROUP my_list2 (LIST) {
REPEATED GROUP list {
REQUIRED BINARY element (UTF8);
}
}
REQUIRED GROUP my_list3 (LIST) {
REPEATED GROUP list {
REQUIRED BINARY element (UTF8);
}
}
}
";
// // List<String> (list non-null, elements nullable)
// required group my_list1 (LIST) {
// repeated group list {
// optional binary element (UTF8);
// }
// }
{
arrow_fields.push(Field::new(
"my_list1",
DataType::List(Box::new(Field::new("element", DataType::Utf8, true))),
false,
));
}
// // List<String> (list nullable, elements non-null)
// optional group my_list2 (LIST) {
// repeated group list {
// required binary element (UTF8);
// }
// }
{
arrow_fields.push(Field::new(
"my_list2",
DataType::List(Box::new(Field::new("element", DataType::Utf8, false))),
true,
));
}
// // List<String> (list non-null, elements non-null)
// repeated group my_list3 (LIST) {
// repeated group list {
// required binary element (UTF8);
// }
// }
{
arrow_fields.push(Field::new(
"my_list3",
DataType::List(Box::new(Field::new("element", DataType::Utf8, false))),
false,
));
}
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
for i in 0..arrow_fields.len() {
assert_eq!(arrow_fields[i], converted_fields[i]);
}
}
#[test]
fn test_nested_schema() {
let mut arrow_fields = Vec::new();
{
let group1_fields = vec![
Field::new("leaf1", DataType::Boolean, false),
Field::new("leaf2", DataType::Int32, false),
];
let group1_struct =
Field::new("group1", DataType::Struct(group1_fields), false);
arrow_fields.push(group1_struct);
let leaf3_field = Field::new("leaf3", DataType::Int64, false);
arrow_fields.push(leaf3_field);
}
let message_type = "
message test_schema {
REQUIRED GROUP group1 {
REQUIRED BOOLEAN leaf1;
REQUIRED INT32 leaf2;
}
REQUIRED INT64 leaf3;
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
for i in 0..arrow_fields.len() {
assert_eq!(arrow_fields[i], converted_fields[i]);
}
}
#[test]
fn test_nested_schema_partial() {
let mut arrow_fields = Vec::new();
{
let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)];
let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
arrow_fields.push(group1);
let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)];
let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
arrow_fields.push(group2);
arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
}
let message_type = "
message test_schema {
REQUIRED GROUP group1 {
REQUIRED INT64 leaf1;
REQUIRED INT64 leaf2;
}
REQUIRED GROUP group2 {
REQUIRED INT64 leaf3;
REQUIRED INT64 leaf4;
}
REQUIRED INT64 leaf5;
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
// Expected partial arrow schema (columns 0, 3, 4):
// required group group1 {
// required int64 leaf1;
// }
// required group group2 {
// required int64 leaf4;
// }
// required int64 leaf5;
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], &None)
.unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
for i in 0..arrow_fields.len() {
assert_eq!(arrow_fields[i], converted_fields[i]);
}
}
#[test]
fn test_nested_schema_partial_ordering() {
let mut arrow_fields = Vec::new();
{
let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)];
let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
arrow_fields.push(group2);
arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)];
let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
arrow_fields.push(group1);
}
let message_type = "
message test_schema {
REQUIRED GROUP group1 {
REQUIRED INT64 leaf1;
REQUIRED INT64 leaf2;
}
REQUIRED GROUP group2 {
REQUIRED INT64 leaf3;
REQUIRED INT64 leaf4;
}
REQUIRED INT64 leaf5;
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
// Expected partial arrow schema (columns 3, 4, 0):
// required group group1 {
// required int64 leaf1;
// }
// required group group2 {
// required int64 leaf4;
// }
// required int64 leaf5;
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], &None)
.unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
for i in 0..arrow_fields.len() {
assert_eq!(arrow_fields[i], converted_fields[i]);
}
}
#[test]
fn test_repeated_nested_schema() {
let mut arrow_fields = Vec::new();
{
arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
let inner_group_list = Field::new(
"innerGroup",
DataType::List(Box::new(Field::new(
"innerGroup",
DataType::Struct(vec![Field::new("leaf3", DataType::Int32, true)]),
true,
))),
true,
);
let outer_group_list = Field::new(
"outerGroup",
DataType::List(Box::new(Field::new(
"outerGroup",
DataType::Struct(vec![
Field::new("leaf2", DataType::Int32, true),
inner_group_list,
]),
true,
))),
true,
);
arrow_fields.push(outer_group_list);
}
let message_type = "
message test_schema {
OPTIONAL INT32 leaf1;
REPEATED GROUP outerGroup {
OPTIONAL INT32 leaf2;
REPEATED GROUP innerGroup {
OPTIONAL INT32 leaf3;
}
}
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
for i in 0..arrow_fields.len() {
assert_eq!(arrow_fields[i], converted_fields[i]);
}
}
#[test]
fn test_column_desc_to_field() {
let message_type = "
message test_schema {
REQUIRED BOOLEAN boolean;
REQUIRED INT32 int8 (INT_8);
REQUIRED INT32 uint8 (INTEGER(8,false));
REQUIRED INT32 int16 (INT_16);
REQUIRED INT32 uint16 (INTEGER(16,false));
REQUIRED INT32 int32;
REQUIRED INT64 int64;
OPTIONAL DOUBLE double;
OPTIONAL FLOAT float;
OPTIONAL BINARY string (UTF8);
REPEATED BOOLEAN bools;
OPTIONAL INT32 date (DATE);
OPTIONAL INT32 time_milli (TIME_MILLIS);
OPTIONAL INT64 time_micro (TIME_MICROS);
OPTIONAL INT64 time_nano (TIME(NANOS,false));
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_fields = parquet_schema
.columns()
.iter()
.map(|c| parquet_to_arrow_field(c).unwrap())
.collect::<Vec<Field>>();
let arrow_fields = vec![
Field::new("boolean", DataType::Boolean, false),
Field::new("int8", DataType::Int8, false),
Field::new("uint8", DataType::UInt8, false),
Field::new("int16", DataType::Int16, false),
Field::new("uint16", DataType::UInt16, false),
Field::new("int32", DataType::Int32, false),
Field::new("int64", DataType::Int64, false),
Field::new("double", DataType::Float64, true),
Field::new("float", DataType::Float32, true),
Field::new("string", DataType::Utf8, true),
Field::new(
"bools",
DataType::List(Box::new(Field::new("bools", DataType::Boolean, true))),
true,
),
Field::new("date", DataType::Date32, true),
Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
Field::new(
"ts_milli",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"ts_micro",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
Field::new(
"ts_nano",
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())),
false,
),
];
assert_eq!(arrow_fields, converted_arrow_fields);
}
#[test]
fn test_field_to_column_desc() {
let message_type = "
message arrow_schema {
REQUIRED BOOLEAN boolean;
REQUIRED INT32 int8 (INT_8);
REQUIRED INT32 int16 (INTEGER(16,true));
REQUIRED INT32 int32;
REQUIRED INT64 int64;
OPTIONAL DOUBLE double;
OPTIONAL FLOAT float;
OPTIONAL BINARY string (STRING);
OPTIONAL GROUP bools (LIST) {
REPEATED GROUP list {
OPTIONAL BOOLEAN element;
}
}
REQUIRED GROUP bools_non_null (LIST) {
REPEATED GROUP list {
REQUIRED BOOLEAN element;
}
}
OPTIONAL INT32 date (DATE);
OPTIONAL INT32 time_milli (TIME(MILLIS,false));
OPTIONAL INT64 time_micro (TIME_MICROS);
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
REQUIRED GROUP struct {
REQUIRED BOOLEAN bools;
REQUIRED INT32 uint32 (INTEGER(32,false));
REQUIRED GROUP int32 (LIST) {
REPEATED GROUP list {
OPTIONAL INT32 element;
}
}
}
REQUIRED BINARY dictionary_strings (STRING);
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let arrow_fields = vec![
Field::new("boolean", DataType::Boolean, false),
Field::new("int8", DataType::Int8, false),
Field::new("int16", DataType::Int16, false),
Field::new("int32", DataType::Int32, false),
Field::new("int64", DataType::Int64, false),
Field::new("double", DataType::Float64, true),
Field::new("float", DataType::Float32, true),
Field::new("string", DataType::Utf8, true),
Field::new(
"bools",
DataType::List(Box::new(Field::new("element", DataType::Boolean, true))),
true,
),
Field::new(
"bools_non_null",
DataType::List(Box::new(Field::new("element", DataType::Boolean, false))),
false,
),
Field::new("date", DataType::Date32, true),
Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
Field::new(
"ts_milli",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"ts_micro",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
Field::new(
"struct",
DataType::Struct(vec![
Field::new("bools", DataType::Boolean, false),
Field::new("uint32", DataType::UInt32, false),
Field::new(
"int32",
DataType::List(Box::new(Field::new(
"element",
DataType::Int32,
true,
))),
false,
),
]),
false,
),
Field::new(
"dictionary_strings",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema).unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
);
parquet_schema
.columns()
.iter()
.zip(converted_arrow_schema.columns())
.for_each(|(a, b)| {
// Only check logical type if it's set on the Parquet side.
// This is because the Arrow conversion always sets logical type,
// even if there wasn't originally one.
// This is not an issue, but is an inconvenience for this test.
match a.logical_type() {
Some(_) => {
assert_eq!(a, b)
}
None => {
assert_eq!(a.name(), b.name());
assert_eq!(a.physical_type(), b.physical_type());
assert_eq!(a.converted_type(), b.converted_type());
}
};
});
}
#[test]
#[should_panic(expected = "Parquet does not support writing empty structs")]
fn test_empty_struct_field() {
let arrow_fields = vec![Field::new("struct", DataType::Struct(vec![]), false)];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema);
assert!(converted_arrow_schema.is_err());
converted_arrow_schema.unwrap();
}
#[test]
fn test_metadata() {
let message_type = "
message test_schema {
OPTIONAL BINARY string (STRING);
}
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let mut key_value_metadata: Vec<KeyValue> = Vec::new();
key_value_metadata.push(KeyValue::new("foo".to_owned(), Some("bar".to_owned())));
key_value_metadata.push(KeyValue::new("baz".to_owned(), None));
let mut expected_metadata: HashMap<String, String> = HashMap::new();
expected_metadata.insert("foo".to_owned(), "bar".to_owned());
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, &Some(key_value_metadata)).unwrap();
assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
}
#[test]
fn test_arrow_schema_roundtrip() -> Result<()> {
// This tests the roundtrip of an Arrow schema
// Fields that are commented out fail roundtrip tests or are unsupported by the writer
let metadata: HashMap<String, String> =
[("Key".to_string(), "Value".to_string())]
.iter()
.cloned()
.collect();
let schema = Schema::new_with_metadata(
vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::Binary, false),
Field::new("c3", DataType::FixedSizeBinary(3), false),
Field::new("c4", DataType::Boolean, false),
Field::new("c5", DataType::Date32, false),
Field::new("c6", DataType::Date64, false),
Field::new("c7", DataType::Time32(TimeUnit::Second), false),
Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
Field::new(
"c16",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_string())),
false,
),
Field::new(
"c17",
DataType::Timestamp(
TimeUnit::Microsecond,
Some("Africa/Johannesburg".to_string()),
),
false,
),
Field::new(
"c18",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
Field::new(
"c21",
DataType::List(Box::new(Field::new("list", DataType::Boolean, true))),
false,
),
// Field::new(
// "c22",
// DataType::FixedSizeList(Box::new(DataType::Boolean), 5),
// false,
// ),
// Field::new(
// "c23",
// DataType::List(Box::new(DataType::LargeList(Box::new(
// DataType::Struct(vec![
// Field::new("a", DataType::Int16, true),
// Field::new("b", DataType::Float64, false),
// ]),
// )))),
// true,
// ),
Field::new(
"c24",
DataType::Struct(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::UInt16, false),
]),
false,
),
Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
// Field::new("c27", DataType::Duration(TimeUnit::Second), false),
// Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
// Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
// Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
Field::new_dict(
"c31",
DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
),
true,
123,
true,
),
Field::new("c32", DataType::LargeBinary, true),
Field::new("c33", DataType::LargeUtf8, true),
// Field::new(
// "c34",
// DataType::LargeList(Box::new(DataType::List(Box::new(
// DataType::Struct(vec![
// Field::new("a", DataType::Int16, true),
// Field::new("b", DataType::Float64, true),
// ]),
// )))),
// true,
// ),
Field::new("c35", DataType::Null, true),
Field::new("c36", DataType::Decimal(2, 1), false),
Field::new("c37", DataType::Decimal(50, 20), false),
Field::new("c38", DataType::Decimal(18, 12), true),
],
metadata,
);
// write to an empty parquet file so that schema is serialized
let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]);
let mut writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
Arc::new(schema.clone()),
None,
)?;
writer.close()?;
// read file back
let parquet_reader = SerializedFileReader::try_from(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let read_schema = arrow_reader.get_schema()?;
assert_eq!(schema, read_schema);
// read all fields by columns
let partial_read_schema =
arrow_reader.get_schema_by_columns(0..(schema.fields().len()), false)?;
assert_eq!(schema, partial_read_schema);
Ok(())
}
#[test]
#[ignore = "Roundtrip of lists currently fails because we don't check their types correctly in the Arrow schema"]
fn test_arrow_schema_roundtrip_lists() -> Result<()> {
let metadata: HashMap<String, String> =
[("Key".to_string(), "Value".to_string())]
.iter()
.cloned()
.collect();
let schema = Schema::new_with_metadata(
vec![
Field::new(
"c21",
DataType::List(Box::new(Field::new(
"array",
DataType::Boolean,
true,
))),
false,
),
Field::new(
"c22",
DataType::FixedSizeList(
Box::new(Field::new("items", DataType::Boolean, false)),
5,
),
false,
),
Field::new(
"c23",
DataType::List(Box::new(Field::new(
"items",
DataType::LargeList(Box::new(Field::new(
"items",
DataType::Struct(vec![
Field::new("a", DataType::Int16, true),
Field::new("b", DataType::Float64, false),
]),
true,
))),
true,
))),
true,
),
],
metadata,
);
// write to an empty parquet file so that schema is serialized
let file = get_temp_file("test_arrow_schema_roundtrip_lists.parquet", &[]);
let mut writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
Arc::new(schema.clone()),
None,
)?;
writer.close()?;
// read file back
let parquet_reader = SerializedFileReader::try_from(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let read_schema = arrow_reader.get_schema()?;
assert_eq!(schema, read_schema);
// read all fields by columns
let partial_read_schema =
arrow_reader.get_schema_by_columns(0..(schema.fields().len()), false)?;
assert_eq!(schema, partial_read_schema);
Ok(())
}
}