blob: 1aa8c266175e9e8dd7a919be69815f0d8d9bd4d3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Contains structs and methods to build Parquet schema and schema descriptors.
use std::{collections::HashMap, convert::From, fmt, sync::Arc};
use parquet_format::SchemaElement;
use crate::basic::{
ConvertedType, LogicalType, Repetition, TimeType, TimeUnit, Type as PhysicalType,
};
use crate::errors::{ParquetError, Result};
// ----------------------------------------------------------------------
// Parquet Type definitions
/// Type alias for `Arc<Type>`.
pub type TypePtr = Arc<Type>;
/// Type alias for `Arc<SchemaDescriptor>`.
pub type SchemaDescPtr = Arc<SchemaDescriptor>;
/// Type alias for `Arc<ColumnDescriptor>`.
pub type ColumnDescPtr = Arc<ColumnDescriptor>;
/// Representation of a Parquet type.
/// Used to describe primitive leaf fields and structs, including top-level schema.
/// Note that the top-level schema type is represented using `GroupType` whose
/// repetition is `None`.
#[derive(Clone, Debug, PartialEq)]
pub enum Type {
PrimitiveType {
basic_info: BasicTypeInfo,
physical_type: PhysicalType,
type_length: i32,
scale: i32,
precision: i32,
},
GroupType {
basic_info: BasicTypeInfo,
fields: Vec<TypePtr>,
},
}
impl Type {
/// Creates primitive type builder with provided field name and physical type.
pub fn primitive_type_builder(
name: &str,
physical_type: PhysicalType,
) -> PrimitiveTypeBuilder {
PrimitiveTypeBuilder::new(name, physical_type)
}
/// Creates group type builder with provided column name.
pub fn group_type_builder(name: &str) -> GroupTypeBuilder {
GroupTypeBuilder::new(name)
}
/// Returns [`BasicTypeInfo`] information about the type.
pub fn get_basic_info(&self) -> &BasicTypeInfo {
match *self {
Type::PrimitiveType { ref basic_info, .. } => &basic_info,
Type::GroupType { ref basic_info, .. } => &basic_info,
}
}
/// Returns this type's field name.
pub fn name(&self) -> &str {
self.get_basic_info().name()
}
/// Gets the fields from this group type.
/// Note that this will panic if called on a non-group type.
// TODO: should we return `&[&Type]` here?
pub fn get_fields(&self) -> &[TypePtr] {
match *self {
Type::GroupType { ref fields, .. } => &fields[..],
_ => panic!("Cannot call get_fields() on a non-group type"),
}
}
/// Gets physical type of this primitive type.
/// Note that this will panic if called on a non-primitive type.
pub fn get_physical_type(&self) -> PhysicalType {
match *self {
Type::PrimitiveType {
basic_info: _,
physical_type,
..
} => physical_type,
_ => panic!("Cannot call get_physical_type() on a non-primitive type"),
}
}
/// Gets precision of this primitive type.
/// Note that this will panic if called on a non-primitive type.
pub fn get_precision(&self) -> i32 {
match *self {
Type::PrimitiveType { precision, .. } => precision,
_ => panic!("Cannot call get_precision() on non-primitive type"),
}
}
/// Gets scale of this primitive type.
/// Note that this will panic if called on a non-primitive type.
pub fn get_scale(&self) -> i32 {
match *self {
Type::PrimitiveType { scale, .. } => scale,
_ => panic!("Cannot call get_scale() on non-primitive type"),
}
}
/// Checks if `sub_type` schema is part of current schema.
/// This method can be used to check if projected columns are part of the root schema.
pub fn check_contains(&self, sub_type: &Type) -> bool {
// Names match, and repetitions match or not set for both
let basic_match = self.get_basic_info().name()
== sub_type.get_basic_info().name()
&& (self.is_schema() && sub_type.is_schema()
|| !self.is_schema()
&& !sub_type.is_schema()
&& self.get_basic_info().repetition()
== sub_type.get_basic_info().repetition());
match *self {
Type::PrimitiveType { .. } if basic_match && sub_type.is_primitive() => {
self.get_physical_type() == sub_type.get_physical_type()
}
Type::GroupType { .. } if basic_match && sub_type.is_group() => {
// build hashmap of name -> TypePtr
let mut field_map = HashMap::new();
for field in self.get_fields() {
field_map.insert(field.name(), field);
}
for field in sub_type.get_fields() {
if !field_map
.get(field.name())
.map(|tpe| tpe.check_contains(field))
.unwrap_or(false)
{
return false;
}
}
true
}
_ => false,
}
}
/// Returns `true` if this type is a primitive type, `false` otherwise.
pub fn is_primitive(&self) -> bool {
matches!(*self, Type::PrimitiveType { .. })
}
/// Returns `true` if this type is a group type, `false` otherwise.
pub fn is_group(&self) -> bool {
matches!(*self, Type::GroupType { .. })
}
/// Returns `true` if this type is the top-level schema type (message type).
pub fn is_schema(&self) -> bool {
match *self {
Type::GroupType { ref basic_info, .. } => !basic_info.has_repetition(),
_ => false,
}
}
/// Returns `true` if this type is repeated or optional.
/// If this type doesn't have repetition defined, we still treat it as optional.
pub fn is_optional(&self) -> bool {
self.get_basic_info().has_repetition()
&& self.get_basic_info().repetition() != Repetition::REQUIRED
}
}
/// A builder for primitive types. All attributes are optional
/// except the name and physical type.
/// Note that if not specified explicitly, `Repetition::OPTIONAL` is used.
pub struct PrimitiveTypeBuilder<'a> {
name: &'a str,
repetition: Repetition,
physical_type: PhysicalType,
converted_type: ConvertedType,
logical_type: Option<LogicalType>,
length: i32,
precision: i32,
scale: i32,
id: Option<i32>,
}
impl<'a> PrimitiveTypeBuilder<'a> {
/// Creates new primitive type builder with provided field name and physical type.
pub fn new(name: &'a str, physical_type: PhysicalType) -> Self {
Self {
name,
repetition: Repetition::OPTIONAL,
physical_type,
converted_type: ConvertedType::NONE,
logical_type: None,
length: -1,
precision: -1,
scale: -1,
id: None,
}
}
/// Sets [`Repetition`](crate::basic::Repetition) for this field and returns itself.
pub fn with_repetition(mut self, repetition: Repetition) -> Self {
self.repetition = repetition;
self
}
/// Sets [`ConvertedType`](crate::basic::ConvertedType) for this field and returns itself.
pub fn with_converted_type(mut self, converted_type: ConvertedType) -> Self {
self.converted_type = converted_type;
self
}
/// Sets [`LogicalType`](crate::basic::LogicalType) for this field and returns itself.
/// If only the logical type is populated for a primitive type, the converted type
/// will be automatically populated, and can thus be omitted.
pub fn with_logical_type(mut self, logical_type: Option<LogicalType>) -> Self {
self.logical_type = logical_type;
self
}
/// Sets type length and returns itself.
/// This is only applied to FIXED_LEN_BYTE_ARRAY and INT96 (INTERVAL) types, because
/// they maintain fixed size underlying byte array.
/// By default, value is `0`.
pub fn with_length(mut self, length: i32) -> Self {
self.length = length;
self
}
/// Sets precision for Parquet DECIMAL physical type and returns itself.
/// By default, it equals to `0` and used only for decimal context.
pub fn with_precision(mut self, precision: i32) -> Self {
self.precision = precision;
self
}
/// Sets scale for Parquet DECIMAL physical type and returns itself.
/// By default, it equals to `0` and used only for decimal context.
pub fn with_scale(mut self, scale: i32) -> Self {
self.scale = scale;
self
}
/// Sets optional field id and returns itself.
pub fn with_id(mut self, id: i32) -> Self {
self.id = Some(id);
self
}
/// Creates a new `PrimitiveType` instance from the collected attributes.
/// Returns `Err` in case of any building conditions are not met.
pub fn build(self) -> Result<Type> {
let mut basic_info = BasicTypeInfo {
name: String::from(self.name),
repetition: Some(self.repetition),
converted_type: self.converted_type,
logical_type: self.logical_type.clone(),
id: self.id,
};
// Check length before logical type, since it is used for logical type validation.
if self.physical_type == PhysicalType::FIXED_LEN_BYTE_ARRAY && self.length < 0 {
return Err(general_err!(
"Invalid FIXED_LEN_BYTE_ARRAY length: {}",
self.length
));
}
match &self.logical_type {
Some(logical_type) => {
// If a converted type is populated, check that it is consistent with
// its logical type
if self.converted_type != ConvertedType::NONE {
if ConvertedType::from(self.logical_type.clone())
!= self.converted_type
{
return Err(general_err!(
"Logical type {:?} is imcompatible with converted type {}",
logical_type,
self.converted_type
));
}
} else {
// Populate the converted type for backwards compatibility
basic_info.converted_type = self.logical_type.clone().into();
}
// Check that logical type and physical type are compatible
match (logical_type, self.physical_type) {
(LogicalType::MAP(_), _) | (LogicalType::LIST(_), _) => {
return Err(general_err!(
"{:?} cannot be applied to a primitive type",
logical_type
));
}
(LogicalType::ENUM(_), PhysicalType::BYTE_ARRAY) => {}
(LogicalType::DECIMAL(t), _) => {
// Check that scale and precision are consistent with legacy values
if t.scale != self.scale {
return Err(general_err!(
"DECIMAL logical type scale {} must match self.scale {}",
t.scale,
self.scale
));
}
if t.precision != self.precision {
return Err(general_err!(
"DECIMAL logical type precision {} must match self.precision {}",
t.precision,
self.precision
));
}
self.check_decimal_precision_scale()?;
}
(LogicalType::DATE(_), PhysicalType::INT32) => {}
(
LogicalType::TIME(TimeType {
unit: TimeUnit::MILLIS(_),
..
}),
PhysicalType::INT32,
) => {}
(LogicalType::TIME(t), PhysicalType::INT64) => {
if t.unit == TimeUnit::MILLIS(Default::default()) {
return Err(general_err!(
"Cannot use millisecond unit on INT64 type"
));
}
}
(LogicalType::TIMESTAMP(_), PhysicalType::INT64) => {}
(LogicalType::INTEGER(t), PhysicalType::INT32)
if t.bit_width <= 32 => {}
(LogicalType::INTEGER(t), PhysicalType::INT64)
if t.bit_width == 64 => {}
// Null type
(LogicalType::UNKNOWN(_), PhysicalType::INT32) => {}
(LogicalType::STRING(_), PhysicalType::BYTE_ARRAY) => {}
(LogicalType::JSON(_), PhysicalType::BYTE_ARRAY) => {}
(LogicalType::BSON(_), PhysicalType::BYTE_ARRAY) => {}
(LogicalType::UUID(_), PhysicalType::FIXED_LEN_BYTE_ARRAY) => {}
(a, b) => {
return Err(general_err!(
"Cannot annotate {:?} from {} fields",
a,
b
))
}
}
}
None => {}
}
match self.converted_type {
ConvertedType::NONE => {}
ConvertedType::UTF8 | ConvertedType::BSON | ConvertedType::JSON => {
if self.physical_type != PhysicalType::BYTE_ARRAY {
return Err(general_err!(
"{} can only annotate BYTE_ARRAY fields",
self.converted_type
));
}
}
ConvertedType::DECIMAL => {
self.check_decimal_precision_scale()?;
}
ConvertedType::DATE
| ConvertedType::TIME_MILLIS
| ConvertedType::UINT_8
| ConvertedType::UINT_16
| ConvertedType::UINT_32
| ConvertedType::INT_8
| ConvertedType::INT_16
| ConvertedType::INT_32 => {
if self.physical_type != PhysicalType::INT32 {
return Err(general_err!(
"{} can only annotate INT32",
self.converted_type
));
}
}
ConvertedType::TIME_MICROS
| ConvertedType::TIMESTAMP_MILLIS
| ConvertedType::TIMESTAMP_MICROS
| ConvertedType::UINT_64
| ConvertedType::INT_64 => {
if self.physical_type != PhysicalType::INT64 {
return Err(general_err!(
"{} can only annotate INT64",
self.converted_type
));
}
}
ConvertedType::INTERVAL => {
if self.physical_type != PhysicalType::FIXED_LEN_BYTE_ARRAY
|| self.length != 12
{
return Err(general_err!(
"INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)"
));
}
}
ConvertedType::ENUM => {
if self.physical_type != PhysicalType::BYTE_ARRAY {
return Err(general_err!("ENUM can only annotate BYTE_ARRAY fields"));
}
}
_ => {
return Err(general_err!(
"{} cannot be applied to a primitive type",
self.converted_type
));
}
}
Ok(Type::PrimitiveType {
basic_info,
physical_type: self.physical_type,
type_length: self.length,
scale: self.scale,
precision: self.precision,
})
}
#[inline]
fn check_decimal_precision_scale(&self) -> Result<()> {
match self.physical_type {
PhysicalType::INT32
| PhysicalType::INT64
| PhysicalType::BYTE_ARRAY
| PhysicalType::FIXED_LEN_BYTE_ARRAY => (),
_ => {
return Err(general_err!(
"DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"
));
}
}
// Precision is required and must be a non-zero positive integer.
if self.precision < 1 {
return Err(general_err!(
"Invalid DECIMAL precision: {}",
self.precision
));
}
// Scale must be zero or a positive integer less than the precision.
if self.scale < 0 {
return Err(general_err!("Invalid DECIMAL scale: {}", self.scale));
}
if self.scale >= self.precision {
return Err(general_err!(
"Invalid DECIMAL: scale ({}) cannot be greater than or equal to precision \
({})",
self.scale,
self.precision
));
}
// Check precision and scale based on physical type limitations.
match self.physical_type {
PhysicalType::INT32 => {
if self.precision > 9 {
return Err(general_err!(
"Cannot represent INT32 as DECIMAL with precision {}",
self.precision
));
}
}
PhysicalType::INT64 => {
if self.precision > 18 {
return Err(general_err!(
"Cannot represent INT64 as DECIMAL with precision {}",
self.precision
));
}
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
let max_precision =
(2f64.powi(8 * self.length - 1) - 1f64).log10().floor() as i32;
if self.precision > max_precision {
return Err(general_err!(
"Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length {} and \
precision {}. The max precision can only be {}",
self.length,
self.precision,
max_precision
));
}
}
_ => (), // For BYTE_ARRAY precision is not limited
}
Ok(())
}
}
/// A builder for group types. All attributes are optional except the name.
/// Note that if not specified explicitly, `None` is used as the repetition of the group,
/// which means it is a root (message) type.
pub struct GroupTypeBuilder<'a> {
name: &'a str,
repetition: Option<Repetition>,
converted_type: ConvertedType,
logical_type: Option<LogicalType>,
fields: Vec<TypePtr>,
id: Option<i32>,
}
impl<'a> GroupTypeBuilder<'a> {
/// Creates new group type builder with provided field name.
pub fn new(name: &'a str) -> Self {
Self {
name,
repetition: None,
converted_type: ConvertedType::NONE,
logical_type: None,
fields: Vec::new(),
id: None,
}
}
/// Sets [`Repetition`](crate::basic::Repetition) for this field and returns itself.
pub fn with_repetition(mut self, repetition: Repetition) -> Self {
self.repetition = Some(repetition);
self
}
/// Sets [`ConvertedType`](crate::basic::ConvertedType) for this field and returns itself.
pub fn with_converted_type(mut self, converted_type: ConvertedType) -> Self {
self.converted_type = converted_type;
self
}
/// Sets [`LogicalType`](crate::basic::LogicalType) for this field and returns itself.
pub fn with_logical_type(mut self, logical_type: Option<LogicalType>) -> Self {
self.logical_type = logical_type;
self
}
/// Sets a list of fields that should be child nodes of this field.
/// Returns updated self.
pub fn with_fields(mut self, fields: &mut Vec<TypePtr>) -> Self {
self.fields.append(fields);
self
}
/// Sets optional field id and returns itself.
pub fn with_id(mut self, id: i32) -> Self {
self.id = Some(id);
self
}
/// Creates a new `GroupType` instance from the gathered attributes.
pub fn build(self) -> Result<Type> {
let mut basic_info = BasicTypeInfo {
name: String::from(self.name),
repetition: self.repetition,
converted_type: self.converted_type,
logical_type: self.logical_type.clone(),
id: self.id,
};
// Populate the converted type if only the logical type is populated
if self.logical_type.is_some() && self.converted_type == ConvertedType::NONE {
basic_info.converted_type = self.logical_type.into();
}
Ok(Type::GroupType {
basic_info,
fields: self.fields,
})
}
}
/// Basic type info. This contains information such as the name of the type,
/// the repetition level, the logical type and the kind of the type (group, primitive).
#[derive(Clone, Debug, PartialEq)]
pub struct BasicTypeInfo {
name: String,
repetition: Option<Repetition>,
converted_type: ConvertedType,
logical_type: Option<LogicalType>,
id: Option<i32>,
}
impl BasicTypeInfo {
/// Returns field name.
pub fn name(&self) -> &str {
&self.name
}
/// Returns `true` if type has repetition field set, `false` otherwise.
/// This is mostly applied to group type, because primitive type always has
/// repetition set.
pub fn has_repetition(&self) -> bool {
self.repetition.is_some()
}
/// Returns [`Repetition`](crate::basic::Repetition) value for the type.
pub fn repetition(&self) -> Repetition {
assert!(self.repetition.is_some());
self.repetition.unwrap()
}
/// Returns [`ConvertedType`](crate::basic::ConvertedType) value for the type.
pub fn converted_type(&self) -> ConvertedType {
self.converted_type
}
/// Returns [`LogicalType`](crate::basic::LogicalType) value for the type.
pub fn logical_type(&self) -> Option<LogicalType> {
// Unlike ConvertedType, LogicalType cannot implement Copy, thus we clone it
self.logical_type.clone()
}
/// Returns `true` if id is set, `false` otherwise.
pub fn has_id(&self) -> bool {
self.id.is_some()
}
/// Returns id value for the type.
pub fn id(&self) -> i32 {
assert!(self.id.is_some());
self.id.unwrap()
}
}
// ----------------------------------------------------------------------
// Parquet descriptor definitions
/// Represents a path in a nested schema
#[derive(Clone, PartialEq, Debug, Eq, Hash)]
pub struct ColumnPath {
parts: Vec<String>,
}
impl ColumnPath {
/// Creates new column path from vector of field names.
pub fn new(parts: Vec<String>) -> Self {
ColumnPath { parts }
}
/// Returns string representation of this column path.
/// ```rust
/// use parquet::schema::types::ColumnPath;
///
/// let path = ColumnPath::new(vec!["a".to_string(), "b".to_string(), "c".to_string()]);
/// assert_eq!(&path.string(), "a.b.c");
/// ```
pub fn string(&self) -> String {
self.parts.join(".")
}
/// Appends more components to end of column path.
/// ```rust
/// use parquet::schema::types::ColumnPath;
///
/// let mut path = ColumnPath::new(vec!["a".to_string(), "b".to_string(), "c"
/// .to_string()]);
/// assert_eq!(&path.string(), "a.b.c");
///
/// path.append(vec!["d".to_string(), "e".to_string()]);
/// assert_eq!(&path.string(), "a.b.c.d.e");
/// ```
pub fn append(&mut self, mut tail: Vec<String>) {
self.parts.append(&mut tail);
}
pub fn parts(&self) -> &[String] {
&self.parts
}
}
impl fmt::Display for ColumnPath {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.string())
}
}
impl From<Vec<String>> for ColumnPath {
fn from(parts: Vec<String>) -> Self {
ColumnPath { parts }
}
}
impl<'a> From<&'a str> for ColumnPath {
fn from(single_path: &str) -> Self {
let s = String::from(single_path);
ColumnPath::from(s)
}
}
impl From<String> for ColumnPath {
fn from(single_path: String) -> Self {
let v = vec![single_path];
ColumnPath { parts: v }
}
}
impl AsRef<[String]> for ColumnPath {
fn as_ref(&self) -> &[String] {
&self.parts
}
}
/// A descriptor for leaf-level primitive columns.
/// This encapsulates information such as definition and repetition levels and is used to
/// re-assemble nested data.
#[derive(Debug, PartialEq)]
pub struct ColumnDescriptor {
// The "leaf" primitive type of this column
primitive_type: TypePtr,
// The maximum definition level for this column
max_def_level: i16,
// The maximum repetition level for this column
max_rep_level: i16,
// The path of this column. For instance, "a.b.c.d".
path: ColumnPath,
}
impl ColumnDescriptor {
/// Creates new descriptor for leaf-level column.
pub fn new(
primitive_type: TypePtr,
max_def_level: i16,
max_rep_level: i16,
path: ColumnPath,
) -> Self {
Self {
primitive_type,
max_def_level,
max_rep_level,
path,
}
}
/// Returns maximum definition level for this column.
#[inline]
pub fn max_def_level(&self) -> i16 {
self.max_def_level
}
/// Returns maximum repetition level for this column.
#[inline]
pub fn max_rep_level(&self) -> i16 {
self.max_rep_level
}
/// Returns [`ColumnPath`] for this column.
pub fn path(&self) -> &ColumnPath {
&self.path
}
/// Returns self type [`Type`](crate::schema::types::Type) for this leaf column.
pub fn self_type(&self) -> &Type {
self.primitive_type.as_ref()
}
/// Returns self type [`TypePtr`](crate::schema::types::TypePtr) for this leaf
/// column.
pub fn self_type_ptr(&self) -> TypePtr {
self.primitive_type.clone()
}
/// Returns column name.
pub fn name(&self) -> &str {
self.primitive_type.name()
}
/// Returns [`ConvertedType`](crate::basic::ConvertedType) for this column.
pub fn converted_type(&self) -> ConvertedType {
self.primitive_type.get_basic_info().converted_type()
}
/// Returns [`LogicalType`](crate::basic::LogicalType) for this column.
pub fn logical_type(&self) -> Option<LogicalType> {
self.primitive_type.get_basic_info().logical_type()
}
/// Returns physical type for this column.
/// Note that it will panic if called on a non-primitive type.
pub fn physical_type(&self) -> PhysicalType {
match self.primitive_type.as_ref() {
Type::PrimitiveType { physical_type, .. } => *physical_type,
_ => panic!("Expected primitive type!"),
}
}
/// Returns type length for this column.
/// Note that it will panic if called on a non-primitive type.
pub fn type_length(&self) -> i32 {
match self.primitive_type.as_ref() {
Type::PrimitiveType { type_length, .. } => *type_length,
_ => panic!("Expected primitive type!"),
}
}
/// Returns type precision for this column.
/// Note that it will panic if called on a non-primitive type.
pub fn type_precision(&self) -> i32 {
match self.primitive_type.as_ref() {
Type::PrimitiveType { precision, .. } => *precision,
_ => panic!("Expected primitive type!"),
}
}
/// Returns type scale for this column.
/// Note that it will panic if called on a non-primitive type.
pub fn type_scale(&self) -> i32 {
match self.primitive_type.as_ref() {
Type::PrimitiveType { scale, .. } => *scale,
_ => panic!("Expected primitive type!"),
}
}
}
/// A schema descriptor. This encapsulates the top-level schemas for all the columns,
/// as well as all descriptors for all the primitive columns.
pub struct SchemaDescriptor {
// The top-level schema (the "message" type).
// This must be a `GroupType` where each field is a root column type in the schema.
schema: TypePtr,
// All the descriptors for primitive columns in this schema, constructed from
// `schema` in DFS order.
leaves: Vec<ColumnDescPtr>,
// Mapping from a leaf column's index to the root column type that it
// comes from. For instance: the leaf `a.b.c.d` would have a link back to `a`:
// -- a <-----+
// -- -- b |
// -- -- -- c |
// -- -- -- -- d
leaf_to_base: Vec<TypePtr>,
}
impl fmt::Debug for SchemaDescriptor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Skip leaves and leaf_to_base as they only a cache information already found in `schema`
f.debug_struct("SchemaDescriptor")
.field("schema", &self.schema)
.finish()
}
}
impl SchemaDescriptor {
/// Creates new schema descriptor from Parquet schema.
pub fn new(tp: TypePtr) -> Self {
assert!(tp.is_group(), "SchemaDescriptor should take a GroupType");
let mut leaves = vec![];
let mut leaf_to_base = Vec::new();
for f in tp.get_fields() {
let mut path = vec![];
build_tree(f, f, 0, 0, &mut leaves, &mut leaf_to_base, &mut path);
}
Self {
schema: tp,
leaves,
leaf_to_base,
}
}
/// Returns [`ColumnDescriptor`] for a field position.
pub fn column(&self, i: usize) -> ColumnDescPtr {
assert!(
i < self.leaves.len(),
"Index out of bound: {} not in [0, {})",
i,
self.leaves.len()
);
self.leaves[i].clone()
}
/// Returns slice of [`ColumnDescriptor`].
pub fn columns(&self) -> &[ColumnDescPtr] {
&self.leaves
}
/// Returns number of leaf-level columns.
pub fn num_columns(&self) -> usize {
self.leaves.len()
}
/// Returns column root [`Type`](crate::schema::types::Type) for a field position.
pub fn get_column_root(&self, i: usize) -> &Type {
let result = self.column_root_of(i);
result.as_ref()
}
/// Returns column root [`Type`](crate::schema::types::Type) pointer for a field
/// position.
pub fn get_column_root_ptr(&self, i: usize) -> TypePtr {
let result = self.column_root_of(i);
result.clone()
}
fn column_root_of(&self, i: usize) -> &Arc<Type> {
assert!(
i < self.leaves.len(),
"Index out of bound: {} not in [0, {})",
i,
self.leaves.len()
);
self.leaf_to_base
.get(i)
.unwrap_or_else(|| panic!("Expected a value for index {} but found None", i))
}
/// Returns schema as [`Type`](crate::schema::types::Type).
pub fn root_schema(&self) -> &Type {
self.schema.as_ref()
}
pub fn root_schema_ptr(&self) -> TypePtr {
self.schema.clone()
}
/// Returns schema name.
pub fn name(&self) -> &str {
self.schema.name()
}
}
fn build_tree<'a>(
tp: &'a TypePtr,
base_tp: &TypePtr,
mut max_rep_level: i16,
mut max_def_level: i16,
leaves: &mut Vec<ColumnDescPtr>,
leaf_to_base: &mut Vec<TypePtr>,
path_so_far: &mut Vec<&'a str>,
) {
assert!(tp.get_basic_info().has_repetition());
path_so_far.push(tp.name());
match tp.get_basic_info().repetition() {
Repetition::OPTIONAL => {
max_def_level += 1;
}
Repetition::REPEATED => {
max_def_level += 1;
max_rep_level += 1;
}
_ => {}
}
match tp.as_ref() {
Type::PrimitiveType { .. } => {
let mut path: Vec<String> = vec![];
path.extend(path_so_far.iter().copied().map(String::from));
leaves.push(Arc::new(ColumnDescriptor::new(
tp.clone(),
max_def_level,
max_rep_level,
ColumnPath::new(path),
)));
leaf_to_base.push(base_tp.clone());
}
Type::GroupType { ref fields, .. } => {
for f in fields {
build_tree(
f,
base_tp,
max_rep_level,
max_def_level,
leaves,
leaf_to_base,
path_so_far,
);
path_so_far.pop();
}
}
}
}
/// Method to convert from Thrift.
pub fn from_thrift(elements: &[SchemaElement]) -> Result<TypePtr> {
let mut index = 0;
let mut schema_nodes = Vec::new();
while index < elements.len() {
let t = from_thrift_helper(elements, index)?;
index = t.0;
schema_nodes.push(t.1);
}
if schema_nodes.len() != 1 {
return Err(general_err!(
"Expected exactly one root node, but found {}",
schema_nodes.len()
));
}
Ok(schema_nodes.remove(0))
}
/// Constructs a new Type from the `elements`, starting at index `index`.
/// The first result is the starting index for the next Type after this one. If it is
/// equal to `elements.len()`, then this Type is the last one.
/// The second result is the result Type.
fn from_thrift_helper(
elements: &[SchemaElement],
index: usize,
) -> Result<(usize, TypePtr)> {
// Whether or not the current node is root (message type).
// There is only one message type node in the schema tree.
let is_root_node = index == 0;
if index > elements.len() {
return Err(general_err!(
"Index out of bound, index = {}, len = {}",
index,
elements.len()
));
}
let element = &elements[index];
let converted_type = ConvertedType::from(element.converted_type);
// LogicalType is only present in v2 Parquet files. ConvertedType is always
// populated, regardless of the version of the file (v1 or v2).
let logical_type = element
.logical_type
.as_ref()
.map(|value| LogicalType::from(value.clone()));
let field_id = elements[index].field_id;
match elements[index].num_children {
// From parquet-format:
// The children count is used to construct the nested relationship.
// This field is not set when the element is a primitive type
// Sometimes parquet-cpp sets num_children field to 0 for primitive types, so we
// have to handle this case too.
None | Some(0) => {
// primitive type
if elements[index].repetition_type.is_none() {
return Err(general_err!(
"Repetition level must be defined for a primitive type"
));
}
let repetition = Repetition::from(elements[index].repetition_type.unwrap());
let physical_type = PhysicalType::from(elements[index].type_.unwrap());
let length = elements[index].type_length.unwrap_or(-1);
let scale = elements[index].scale.unwrap_or(-1);
let precision = elements[index].precision.unwrap_or(-1);
let name = &elements[index].name;
let mut builder = Type::primitive_type_builder(name, physical_type)
.with_repetition(repetition)
.with_converted_type(converted_type)
.with_logical_type(logical_type)
.with_length(length)
.with_precision(precision)
.with_scale(scale);
if let Some(id) = field_id {
builder = builder.with_id(id);
}
Ok((index + 1, Arc::new(builder.build()?)))
}
Some(n) => {
let repetition = elements[index].repetition_type.map(Repetition::from);
let mut fields = vec![];
let mut next_index = index + 1;
for _ in 0..n {
let child_result = from_thrift_helper(elements, next_index as usize)?;
next_index = child_result.0;
fields.push(child_result.1);
}
let mut builder = Type::group_type_builder(&elements[index].name)
.with_converted_type(converted_type)
.with_logical_type(logical_type)
.with_fields(&mut fields);
if let Some(rep) = repetition {
// Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or
// REPEATED for root node.
//
// We only set repetition for group types that are not top-level message
// type. According to parquet-format:
// Root of the schema does not have a repetition_type.
// All other types must have one.
if !is_root_node {
builder = builder.with_repetition(rep);
}
}
if let Some(id) = field_id {
builder = builder.with_id(id);
}
Ok((next_index, Arc::new(builder.build().unwrap())))
}
}
}
/// Method to convert to Thrift.
pub fn to_thrift(schema: &Type) -> Result<Vec<SchemaElement>> {
if !schema.is_group() {
return Err(general_err!("Root schema must be Group type"));
}
let mut elements: Vec<SchemaElement> = Vec::new();
to_thrift_helper(schema, &mut elements);
Ok(elements)
}
/// Constructs list of `SchemaElement` from the schema using depth-first traversal.
/// Here we assume that schema is always valid and starts with group type.
fn to_thrift_helper(schema: &Type, elements: &mut Vec<SchemaElement>) {
match *schema {
Type::PrimitiveType {
ref basic_info,
physical_type,
type_length,
scale,
precision,
} => {
let element = SchemaElement {
type_: Some(physical_type.into()),
type_length: if type_length >= 0 {
Some(type_length)
} else {
None
},
repetition_type: Some(basic_info.repetition().into()),
name: basic_info.name().to_owned(),
num_children: None,
converted_type: basic_info.converted_type().into(),
scale: if scale >= 0 { Some(scale) } else { None },
precision: if precision >= 0 {
Some(precision)
} else {
None
},
field_id: if basic_info.has_id() {
Some(basic_info.id())
} else {
None
},
logical_type: basic_info.logical_type().map(|value| value.into()),
};
elements.push(element);
}
Type::GroupType {
ref basic_info,
ref fields,
} => {
let repetition = if basic_info.has_repetition() {
Some(basic_info.repetition().into())
} else {
None
};
let element = SchemaElement {
type_: None,
type_length: None,
repetition_type: repetition,
name: basic_info.name().to_owned(),
num_children: Some(fields.len() as i32),
converted_type: basic_info.converted_type().into(),
scale: None,
precision: None,
field_id: if basic_info.has_id() {
Some(basic_info.id())
} else {
None
},
logical_type: basic_info.logical_type().map(|value| value.into()),
};
elements.push(element);
// Add child elements for a group
for field in fields {
to_thrift_helper(field, elements);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::basic::{DecimalType, IntType};
use crate::schema::parser::parse_message_type;
// TODO: add tests for v2 types
#[test]
fn test_primitive_type() {
let mut result = Type::primitive_type_builder("foo", PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
bit_width: 32,
is_signed: true,
})))
.with_id(0)
.build();
assert!(result.is_ok());
if let Ok(tp) = result {
assert!(tp.is_primitive());
assert!(!tp.is_group());
let basic_info = tp.get_basic_info();
assert_eq!(basic_info.repetition(), Repetition::OPTIONAL);
assert_eq!(
basic_info.logical_type(),
Some(LogicalType::INTEGER(IntType {
bit_width: 32,
is_signed: true
}))
);
assert_eq!(basic_info.converted_type(), ConvertedType::INT_32);
assert_eq!(basic_info.id(), 0);
match tp {
Type::PrimitiveType { physical_type, .. } => {
assert_eq!(physical_type, PhysicalType::INT32);
}
_ => panic!(),
}
}
// Test illegal inputs with logical type
result = Type::primitive_type_builder("foo", PhysicalType::INT64)
.with_repetition(Repetition::REPEATED)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
is_signed: true,
bit_width: 8,
})))
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: Cannot annotate INTEGER(IntType { bit_width: 8, is_signed: true }) from INT64 fields"
);
}
// Test illegal inputs with converted type
result = Type::primitive_type_builder("foo", PhysicalType::INT64)
.with_repetition(Repetition::REPEATED)
.with_converted_type(ConvertedType::BSON)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: BSON can only annotate BYTE_ARRAY fields"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::INT96)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_precision(-1)
.with_scale(-1)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::DECIMAL(DecimalType {
scale: 32,
precision: 12,
})))
.with_precision(-1)
.with_scale(-1)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: DECIMAL logical type scale 32 must match self.scale -1"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_precision(-1)
.with_scale(-1)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: Invalid DECIMAL precision: -1"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_precision(0)
.with_scale(-1)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: Invalid DECIMAL precision: 0"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_precision(1)
.with_scale(-1)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(format!("{}", e), "Parquet error: Invalid DECIMAL scale: -1");
}
result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_precision(1)
.with_scale(2)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: Invalid DECIMAL: scale (2) cannot be greater than or equal to precision (1)"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_precision(18)
.with_scale(2)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: Cannot represent INT32 as DECIMAL with precision 18"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::INT64)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_precision(32)
.with_scale(2)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: Cannot represent INT64 as DECIMAL with precision 32"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_length(5)
.with_precision(12)
.with_scale(2)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length 5 and precision 12. The max precision can only be 11"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::INT64)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::UINT_8)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: UINT_8 can only annotate INT32"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::TIME_MICROS)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: TIME_MICROS can only annotate INT64"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::INTERVAL)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::INTERVAL)
.with_length(1)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::ENUM)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: ENUM can only annotate BYTE_ARRAY fields"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::MAP)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: MAP cannot be applied to a primitive type"
);
}
result = Type::primitive_type_builder("foo", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::DECIMAL)
.with_length(-1)
.build();
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(
format!("{}", e),
"Parquet error: Invalid FIXED_LEN_BYTE_ARRAY length: -1"
);
}
}
#[test]
fn test_group_type() {
let f1 = Type::primitive_type_builder("f1", PhysicalType::INT32)
.with_converted_type(ConvertedType::INT_32)
.with_id(0)
.build();
assert!(f1.is_ok());
let f2 = Type::primitive_type_builder("f2", PhysicalType::BYTE_ARRAY)
.with_converted_type(ConvertedType::UTF8)
.with_id(1)
.build();
assert!(f2.is_ok());
let mut fields = vec![];
fields.push(Arc::new(f1.unwrap()));
fields.push(Arc::new(f2.unwrap()));
let result = Type::group_type_builder("foo")
.with_repetition(Repetition::REPEATED)
.with_logical_type(Some(LogicalType::LIST(Default::default())))
.with_fields(&mut fields)
.with_id(1)
.build();
assert!(result.is_ok());
let tp = result.unwrap();
let basic_info = tp.get_basic_info();
assert!(tp.is_group());
assert!(!tp.is_primitive());
assert_eq!(basic_info.repetition(), Repetition::REPEATED);
assert_eq!(
basic_info.logical_type(),
Some(LogicalType::LIST(Default::default()))
);
assert_eq!(basic_info.converted_type(), ConvertedType::LIST);
assert_eq!(basic_info.id(), 1);
assert_eq!(tp.get_fields().len(), 2);
assert_eq!(tp.get_fields()[0].name(), "f1");
assert_eq!(tp.get_fields()[1].name(), "f2");
}
#[test]
fn test_column_descriptor() {
let result = test_column_descriptor_helper();
assert!(
result.is_ok(),
"Expected result to be OK but got err:\n {}",
result.unwrap_err()
);
}
fn test_column_descriptor_helper() -> Result<()> {
let tp = Type::primitive_type_builder("name", PhysicalType::BYTE_ARRAY)
.with_converted_type(ConvertedType::UTF8)
.build()?;
let descr = ColumnDescriptor::new(Arc::new(tp), 4, 1, ColumnPath::from("name"));
assert_eq!(descr.path(), &ColumnPath::from("name"));
assert_eq!(descr.converted_type(), ConvertedType::UTF8);
assert_eq!(descr.physical_type(), PhysicalType::BYTE_ARRAY);
assert_eq!(descr.max_def_level(), 4);
assert_eq!(descr.max_rep_level(), 1);
assert_eq!(descr.name(), "name");
assert_eq!(descr.type_length(), -1);
assert_eq!(descr.type_precision(), -1);
assert_eq!(descr.type_scale(), -1);
Ok(())
}
#[test]
fn test_schema_descriptor() {
let result = test_schema_descriptor_helper();
assert!(
result.is_ok(),
"Expected result to be OK but got err:\n {}",
result.unwrap_err()
);
}
// A helper fn to avoid handling the results from type creation
fn test_schema_descriptor_helper() -> Result<()> {
let mut fields = vec![];
let inta = Type::primitive_type_builder("a", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::INT_32)
.build()?;
fields.push(Arc::new(inta));
let intb = Type::primitive_type_builder("b", PhysicalType::INT64)
.with_converted_type(ConvertedType::INT_64)
.build()?;
fields.push(Arc::new(intb));
let intc = Type::primitive_type_builder("c", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REPEATED)
.with_converted_type(ConvertedType::UTF8)
.build()?;
fields.push(Arc::new(intc));
// 3-level list encoding
let item1 = Type::primitive_type_builder("item1", PhysicalType::INT64)
.with_repetition(Repetition::REQUIRED)
.with_converted_type(ConvertedType::INT_64)
.build()?;
let item2 =
Type::primitive_type_builder("item2", PhysicalType::BOOLEAN).build()?;
let item3 = Type::primitive_type_builder("item3", PhysicalType::INT32)
.with_repetition(Repetition::REPEATED)
.with_converted_type(ConvertedType::INT_32)
.build()?;
let list = Type::group_type_builder("records")
.with_repetition(Repetition::REPEATED)
.with_converted_type(ConvertedType::LIST)
.with_fields(&mut vec![Arc::new(item1), Arc::new(item2), Arc::new(item3)])
.build()?;
let bag = Type::group_type_builder("bag")
.with_repetition(Repetition::OPTIONAL)
.with_fields(&mut vec![Arc::new(list)])
.build()?;
fields.push(Arc::new(bag));
let schema = Type::group_type_builder("schema")
.with_repetition(Repetition::REPEATED)
.with_fields(&mut fields)
.build()?;
let descr = SchemaDescriptor::new(Arc::new(schema));
let nleaves = 6;
assert_eq!(descr.num_columns(), nleaves);
// mdef mrep
// required int32 a 0 0
// optional int64 b 1 0
// repeated byte_array c 1 1
// optional group bag 1 0
// repeated group records 2 1
// required int64 item1 2 1
// optional boolean item2 3 1
// repeated int32 item3 3 2
let ex_max_def_levels = vec![0, 1, 1, 2, 3, 3];
let ex_max_rep_levels = vec![0, 0, 1, 1, 1, 2];
for i in 0..nleaves {
let col = descr.column(i);
assert_eq!(col.max_def_level(), ex_max_def_levels[i], "{}", i);
assert_eq!(col.max_rep_level(), ex_max_rep_levels[i], "{}", i);
}
assert_eq!(descr.column(0).path().string(), "a");
assert_eq!(descr.column(1).path().string(), "b");
assert_eq!(descr.column(2).path().string(), "c");
assert_eq!(descr.column(3).path().string(), "bag.records.item1");
assert_eq!(descr.column(4).path().string(), "bag.records.item2");
assert_eq!(descr.column(5).path().string(), "bag.records.item3");
assert_eq!(descr.get_column_root(0).name(), "a");
assert_eq!(descr.get_column_root(3).name(), "bag");
assert_eq!(descr.get_column_root(4).name(), "bag");
assert_eq!(descr.get_column_root(5).name(), "bag");
Ok(())
}
#[test]
fn test_schema_build_tree_def_rep_levels() {
let message_type = "
message spark_schema {
REQUIRED INT32 a;
OPTIONAL group b {
OPTIONAL INT32 _1;
OPTIONAL INT32 _2;
}
OPTIONAL group c (LIST) {
REPEATED group list {
OPTIONAL INT32 element;
}
}
}
";
let schema = parse_message_type(message_type).expect("should parse schema");
let descr = SchemaDescriptor::new(Arc::new(schema));
// required int32 a
assert_eq!(descr.column(0).max_def_level(), 0);
assert_eq!(descr.column(0).max_rep_level(), 0);
// optional int32 b._1
assert_eq!(descr.column(1).max_def_level(), 2);
assert_eq!(descr.column(1).max_rep_level(), 0);
// optional int32 b._2
assert_eq!(descr.column(2).max_def_level(), 2);
assert_eq!(descr.column(2).max_rep_level(), 0);
// repeated optional int32 c.list.element
assert_eq!(descr.column(3).max_def_level(), 3);
assert_eq!(descr.column(3).max_rep_level(), 1);
}
#[test]
#[should_panic(expected = "Cannot call get_physical_type() on a non-primitive type")]
fn test_get_physical_type_panic() {
let list = Type::group_type_builder("records")
.with_repetition(Repetition::REPEATED)
.build()
.unwrap();
list.get_physical_type();
}
#[test]
fn test_get_physical_type_primitive() {
let f = Type::primitive_type_builder("f", PhysicalType::INT64)
.build()
.unwrap();
assert_eq!(f.get_physical_type(), PhysicalType::INT64);
let f = Type::primitive_type_builder("f", PhysicalType::BYTE_ARRAY)
.build()
.unwrap();
assert_eq!(f.get_physical_type(), PhysicalType::BYTE_ARRAY);
}
#[test]
fn test_check_contains_primitive_primitive() {
// OK
let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
.build()
.unwrap();
let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
.build()
.unwrap();
assert!(f1.check_contains(&f2));
// OK: different logical type does not affect check_contains
let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
.with_converted_type(ConvertedType::UINT_8)
.build()
.unwrap();
let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
.with_converted_type(ConvertedType::UINT_16)
.build()
.unwrap();
assert!(f1.check_contains(&f2));
// KO: different name
let f1 = Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap();
let f2 = Type::primitive_type_builder("f2", PhysicalType::INT32)
.build()
.unwrap();
assert!(!f1.check_contains(&f2));
// KO: different type
let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
.build()
.unwrap();
let f2 = Type::primitive_type_builder("f", PhysicalType::INT64)
.build()
.unwrap();
assert!(!f1.check_contains(&f2));
// KO: different repetition
let f1 = Type::primitive_type_builder("f", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap();
let f2 = Type::primitive_type_builder("f", PhysicalType::INT32)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap();
assert!(!f1.check_contains(&f2));
}
// function to create a new group type for testing
fn test_new_group_type(name: &str, repetition: Repetition, types: Vec<Type>) -> Type {
let mut fields = Vec::new();
for tpe in types {
fields.push(Arc::new(tpe))
}
Type::group_type_builder(name)
.with_repetition(repetition)
.with_fields(&mut fields)
.build()
.unwrap()
}
#[test]
fn test_check_contains_group_group() {
// OK: should match okay with empty fields
let f1 = Type::group_type_builder("f").build().unwrap();
let f2 = Type::group_type_builder("f").build().unwrap();
assert!(f1.check_contains(&f2));
// OK: fields match
let f1 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![
Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap(),
Type::primitive_type_builder("f2", PhysicalType::INT64)
.build()
.unwrap(),
],
);
let f2 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![
Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap(),
Type::primitive_type_builder("f2", PhysicalType::INT64)
.build()
.unwrap(),
],
);
assert!(f1.check_contains(&f2));
// OK: subset of fields
let f1 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![
Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap(),
Type::primitive_type_builder("f2", PhysicalType::INT64)
.build()
.unwrap(),
],
);
let f2 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![Type::primitive_type_builder("f2", PhysicalType::INT64)
.build()
.unwrap()],
);
assert!(f1.check_contains(&f2));
// KO: different name
let f1 = Type::group_type_builder("f1").build().unwrap();
let f2 = Type::group_type_builder("f2").build().unwrap();
assert!(!f1.check_contains(&f2));
// KO: different repetition
let f1 = Type::group_type_builder("f")
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap();
let f2 = Type::group_type_builder("f")
.with_repetition(Repetition::REPEATED)
.build()
.unwrap();
assert!(!f1.check_contains(&f2));
// KO: different fields
let f1 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![
Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap(),
Type::primitive_type_builder("f2", PhysicalType::INT64)
.build()
.unwrap(),
],
);
let f2 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![
Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap(),
Type::primitive_type_builder("f2", PhysicalType::BOOLEAN)
.build()
.unwrap(),
],
);
assert!(!f1.check_contains(&f2));
// KO: different fields
let f1 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![
Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap(),
Type::primitive_type_builder("f2", PhysicalType::INT64)
.build()
.unwrap(),
],
);
let f2 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![Type::primitive_type_builder("f3", PhysicalType::INT32)
.build()
.unwrap()],
);
assert!(!f1.check_contains(&f2));
}
#[test]
fn test_check_contains_group_primitive() {
// KO: should not match
let f1 = Type::group_type_builder("f").build().unwrap();
let f2 = Type::primitive_type_builder("f", PhysicalType::INT64)
.build()
.unwrap();
assert!(!f1.check_contains(&f2));
assert!(!f2.check_contains(&f1));
// KO: should not match when primitive field is part of group type
let f1 = test_new_group_type(
"f",
Repetition::REPEATED,
vec![Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap()],
);
let f2 = Type::primitive_type_builder("f1", PhysicalType::INT32)
.build()
.unwrap();
assert!(!f1.check_contains(&f2));
assert!(!f2.check_contains(&f1));
// OK: match nested types
let f1 = test_new_group_type(
"a",
Repetition::REPEATED,
vec![
test_new_group_type(
"b",
Repetition::REPEATED,
vec![Type::primitive_type_builder("c", PhysicalType::INT32)
.build()
.unwrap()],
),
Type::primitive_type_builder("d", PhysicalType::INT64)
.build()
.unwrap(),
Type::primitive_type_builder("e", PhysicalType::BOOLEAN)
.build()
.unwrap(),
],
);
let f2 = test_new_group_type(
"a",
Repetition::REPEATED,
vec![test_new_group_type(
"b",
Repetition::REPEATED,
vec![Type::primitive_type_builder("c", PhysicalType::INT32)
.build()
.unwrap()],
)],
);
assert!(f1.check_contains(&f2)); // should match
assert!(!f2.check_contains(&f1)); // should fail
}
#[test]
fn test_schema_type_thrift_conversion_err() {
let schema = Type::primitive_type_builder("col", PhysicalType::INT32)
.build()
.unwrap();
let thrift_schema = to_thrift(&schema);
assert!(thrift_schema.is_err());
if let Err(e) = thrift_schema {
assert_eq!(
format!("{}", e),
"Parquet error: Root schema must be Group type"
);
}
}
#[test]
fn test_schema_type_thrift_conversion() {
let message_type = "
message conversions {
REQUIRED INT64 id;
OPTIONAL group int_array_Array (LIST) {
REPEATED group list {
OPTIONAL group element (LIST) {
REPEATED group list {
OPTIONAL INT32 element;
}
}
}
}
OPTIONAL group int_map (MAP) {
REPEATED group map (MAP_KEY_VALUE) {
REQUIRED BYTE_ARRAY key (UTF8);
OPTIONAL INT32 value;
}
}
OPTIONAL group int_Map_Array (LIST) {
REPEATED group list {
OPTIONAL group g (MAP) {
REPEATED group map (MAP_KEY_VALUE) {
REQUIRED BYTE_ARRAY key (UTF8);
OPTIONAL group value {
OPTIONAL group H {
OPTIONAL group i (LIST) {
REPEATED group list {
OPTIONAL DOUBLE element;
}
}
}
}
}
}
}
}
OPTIONAL group nested_struct {
OPTIONAL INT32 A;
OPTIONAL group b (LIST) {
REPEATED group list {
REQUIRED FIXED_LEN_BYTE_ARRAY (16) element;
}
}
}
}
";
let expected_schema = parse_message_type(message_type).unwrap();
let thrift_schema = to_thrift(&expected_schema).unwrap();
let result_schema = from_thrift(&thrift_schema).unwrap();
assert_eq!(result_schema, Arc::new(expected_schema));
}
#[test]
fn test_schema_type_thrift_conversion_decimal() {
let message_type = "
message decimals {
OPTIONAL INT32 field0;
OPTIONAL INT64 field1 (DECIMAL (18, 2));
OPTIONAL FIXED_LEN_BYTE_ARRAY (16) field2 (DECIMAL (38, 18));
OPTIONAL BYTE_ARRAY field3 (DECIMAL (9));
}
";
let expected_schema = parse_message_type(message_type).unwrap();
let thrift_schema = to_thrift(&expected_schema).unwrap();
let result_schema = from_thrift(&thrift_schema).unwrap();
assert_eq!(result_schema, Arc::new(expected_schema));
}
// Tests schema conversion from thrift, when num_children is set to Some(0) for a
// primitive type.
#[test]
fn test_schema_from_thrift_with_num_children_set() {
// schema definition written by parquet-cpp version 1.3.2-SNAPSHOT
let message_type = "
message schema {
OPTIONAL BYTE_ARRAY id (UTF8);
OPTIONAL BYTE_ARRAY name (UTF8);
OPTIONAL BYTE_ARRAY message (UTF8);
OPTIONAL INT32 type (UINT_8);
OPTIONAL INT64 author_time (TIMESTAMP_MILLIS);
OPTIONAL INT64 __index_level_0__;
}
";
let expected_schema = parse_message_type(message_type).unwrap();
let mut thrift_schema = to_thrift(&expected_schema).unwrap();
// Change all of None to Some(0)
for mut elem in &mut thrift_schema[..] {
if elem.num_children == None {
elem.num_children = Some(0);
}
}
let result_schema = from_thrift(&thrift_schema).unwrap();
assert_eq!(result_schema, Arc::new(expected_schema));
}
// Sometimes parquet-cpp sets repetition level for the root node, which is against
// the format definition, but we need to handle it by setting it back to None.
#[test]
fn test_schema_from_thrift_root_has_repetition() {
// schema definition written by parquet-cpp version 1.3.2-SNAPSHOT
let message_type = "
message schema {
OPTIONAL BYTE_ARRAY a (UTF8);
OPTIONAL INT32 b (UINT_8);
}
";
let expected_schema = parse_message_type(message_type).unwrap();
let mut thrift_schema = to_thrift(&expected_schema).unwrap();
thrift_schema[0].repetition_type = Some(Repetition::REQUIRED.into());
let result_schema = from_thrift(&thrift_schema).unwrap();
assert_eq!(result_schema, Arc::new(expected_schema));
}
}