| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Utilities for converting between IPC types and native Arrow types |
| |
| use crate::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; |
| use crate::error::{ArrowError, Result}; |
| use crate::ipc; |
| |
| use flatbuffers::{ |
| FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, |
| }; |
| use std::collections::{BTreeMap, HashMap}; |
| |
| use DataType::*; |
| |
| /// Serialize a schema in IPC format |
| pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { |
| let mut fbb = FlatBufferBuilder::new(); |
| |
| let root = schema_to_fb_offset(&mut fbb, schema); |
| |
| fbb.finish(root, None); |
| |
| fbb |
| } |
| |
| pub fn schema_to_fb_offset<'a>( |
| fbb: &mut FlatBufferBuilder<'a>, |
| schema: &Schema, |
| ) -> WIPOffset<ipc::Schema<'a>> { |
| let mut fields = vec![]; |
| for field in schema.fields() { |
| let fb_field = build_field(fbb, field); |
| fields.push(fb_field); |
| } |
| |
| let mut custom_metadata = vec![]; |
| for (k, v) in schema.metadata() { |
| let fb_key_name = fbb.create_string(k.as_str()); |
| let fb_val_name = fbb.create_string(v.as_str()); |
| |
| let mut kv_builder = ipc::KeyValueBuilder::new(fbb); |
| kv_builder.add_key(fb_key_name); |
| kv_builder.add_value(fb_val_name); |
| custom_metadata.push(kv_builder.finish()); |
| } |
| |
| let fb_field_list = fbb.create_vector(&fields); |
| let fb_metadata_list = fbb.create_vector(&custom_metadata); |
| |
| let mut builder = ipc::SchemaBuilder::new(fbb); |
| builder.add_fields(fb_field_list); |
| builder.add_custom_metadata(fb_metadata_list); |
| builder.finish() |
| } |
| |
| /// Convert an IPC Field to Arrow Field |
| impl<'a> From<ipc::Field<'a>> for Field { |
| fn from(field: ipc::Field) -> Field { |
| let mut arrow_field = if let Some(dictionary) = field.dictionary() { |
| Field::new_dict( |
| field.name().unwrap(), |
| get_data_type(field, true), |
| field.nullable(), |
| dictionary.id(), |
| dictionary.isOrdered(), |
| ) |
| } else { |
| Field::new( |
| field.name().unwrap(), |
| get_data_type(field, true), |
| field.nullable(), |
| ) |
| }; |
| |
| let mut metadata = None; |
| if let Some(list) = field.custom_metadata() { |
| let mut metadata_map = BTreeMap::default(); |
| for kv in list { |
| if let (Some(k), Some(v)) = (kv.key(), kv.value()) { |
| metadata_map.insert(k.to_string(), v.to_string()); |
| } |
| } |
| metadata = Some(metadata_map); |
| } |
| |
| arrow_field.set_metadata(metadata); |
| arrow_field |
| } |
| } |
| |
| /// Deserialize a Schema table from IPC format to Schema data type |
| pub fn fb_to_schema(fb: ipc::Schema) -> Schema { |
| let mut fields: Vec<Field> = vec![]; |
| let c_fields = fb.fields().unwrap(); |
| let len = c_fields.len(); |
| for i in 0..len { |
| let c_field: ipc::Field = c_fields.get(i); |
| match c_field.type_type() { |
| ipc::Type::Decimal if fb.endianness() == ipc::Endianness::Big => { |
| unimplemented!("Big Endian is not supported for Decimal!") |
| } |
| _ => (), |
| }; |
| fields.push(c_field.into()); |
| } |
| |
| let mut metadata: HashMap<String, String> = HashMap::default(); |
| if let Some(md_fields) = fb.custom_metadata() { |
| let len = md_fields.len(); |
| for i in 0..len { |
| let kv = md_fields.get(i); |
| let k_str = kv.key(); |
| let v_str = kv.value(); |
| if let Some(k) = k_str { |
| if let Some(v) = v_str { |
| metadata.insert(k.to_string(), v.to_string()); |
| } |
| } |
| } |
| } |
| Schema::new_with_metadata(fields, metadata) |
| } |
| |
| /// Deserialize an IPC message into a schema |
| pub fn schema_from_bytes(bytes: &[u8]) -> Result<Schema> { |
| if let Ok(ipc) = ipc::root_as_message(bytes) { |
| if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) { |
| Ok(schema) |
| } else { |
| Err(ArrowError::IoError( |
| "Unable to get head as schema".to_string(), |
| )) |
| } |
| } else { |
| Err(ArrowError::IoError( |
| "Unable to get root as message".to_string(), |
| )) |
| } |
| } |
| |
| /// Get the Arrow data type from the flatbuffer Field table |
| pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataType { |
| if let Some(dictionary) = field.dictionary() { |
| if may_be_dictionary { |
| let int = dictionary.indexType().unwrap(); |
| let index_type = match (int.bitWidth(), int.is_signed()) { |
| (8, true) => DataType::Int8, |
| (8, false) => DataType::UInt8, |
| (16, true) => DataType::Int16, |
| (16, false) => DataType::UInt16, |
| (32, true) => DataType::Int32, |
| (32, false) => DataType::UInt32, |
| (64, true) => DataType::Int64, |
| (64, false) => DataType::UInt64, |
| _ => panic!("Unexpected bitwidth and signed"), |
| }; |
| return DataType::Dictionary( |
| Box::new(index_type), |
| Box::new(get_data_type(field, false)), |
| ); |
| } |
| } |
| |
| match field.type_type() { |
| ipc::Type::Null => DataType::Null, |
| ipc::Type::Bool => DataType::Boolean, |
| ipc::Type::Int => { |
| let int = field.type_as_int().unwrap(); |
| match (int.bitWidth(), int.is_signed()) { |
| (8, true) => DataType::Int8, |
| (8, false) => DataType::UInt8, |
| (16, true) => DataType::Int16, |
| (16, false) => DataType::UInt16, |
| (32, true) => DataType::Int32, |
| (32, false) => DataType::UInt32, |
| (64, true) => DataType::Int64, |
| (64, false) => DataType::UInt64, |
| z => panic!( |
| "Int type with bit width of {} and signed of {} not supported", |
| z.0, z.1 |
| ), |
| } |
| } |
| ipc::Type::Binary => DataType::Binary, |
| ipc::Type::LargeBinary => DataType::LargeBinary, |
| ipc::Type::Utf8 => DataType::Utf8, |
| ipc::Type::LargeUtf8 => DataType::LargeUtf8, |
| ipc::Type::FixedSizeBinary => { |
| let fsb = field.type_as_fixed_size_binary().unwrap(); |
| DataType::FixedSizeBinary(fsb.byteWidth()) |
| } |
| ipc::Type::FloatingPoint => { |
| let float = field.type_as_floating_point().unwrap(); |
| match float.precision() { |
| ipc::Precision::HALF => DataType::Float16, |
| ipc::Precision::SINGLE => DataType::Float32, |
| ipc::Precision::DOUBLE => DataType::Float64, |
| z => panic!("FloatingPoint type with precision of {:?} not supported", z), |
| } |
| } |
| ipc::Type::Date => { |
| let date = field.type_as_date().unwrap(); |
| match date.unit() { |
| ipc::DateUnit::DAY => DataType::Date32, |
| ipc::DateUnit::MILLISECOND => DataType::Date64, |
| z => panic!("Date type with unit of {:?} not supported", z), |
| } |
| } |
| ipc::Type::Time => { |
| let time = field.type_as_time().unwrap(); |
| match (time.bitWidth(), time.unit()) { |
| (32, ipc::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second), |
| (32, ipc::TimeUnit::MILLISECOND) => { |
| DataType::Time32(TimeUnit::Millisecond) |
| } |
| (64, ipc::TimeUnit::MICROSECOND) => { |
| DataType::Time64(TimeUnit::Microsecond) |
| } |
| (64, ipc::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond), |
| z => panic!( |
| "Time type with bit width of {} and unit of {:?} not supported", |
| z.0, z.1 |
| ), |
| } |
| } |
| ipc::Type::Timestamp => { |
| let timestamp = field.type_as_timestamp().unwrap(); |
| let timezone: Option<String> = timestamp.timezone().map(|tz| tz.to_string()); |
| match timestamp.unit() { |
| ipc::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone), |
| ipc::TimeUnit::MILLISECOND => { |
| DataType::Timestamp(TimeUnit::Millisecond, timezone) |
| } |
| ipc::TimeUnit::MICROSECOND => { |
| DataType::Timestamp(TimeUnit::Microsecond, timezone) |
| } |
| ipc::TimeUnit::NANOSECOND => { |
| DataType::Timestamp(TimeUnit::Nanosecond, timezone) |
| } |
| z => panic!("Timestamp type with unit of {:?} not supported", z), |
| } |
| } |
| ipc::Type::Interval => { |
| let interval = field.type_as_interval().unwrap(); |
| match interval.unit() { |
| ipc::IntervalUnit::YEAR_MONTH => { |
| DataType::Interval(IntervalUnit::YearMonth) |
| } |
| ipc::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime), |
| z => panic!("Interval type with unit of {:?} unsupported", z), |
| } |
| } |
| ipc::Type::Duration => { |
| let duration = field.type_as_duration().unwrap(); |
| match duration.unit() { |
| ipc::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second), |
| ipc::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond), |
| ipc::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond), |
| ipc::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond), |
| z => panic!("Duration type with unit of {:?} unsupported", z), |
| } |
| } |
| ipc::Type::List => { |
| let children = field.children().unwrap(); |
| if children.len() != 1 { |
| panic!("expect a list to have one child") |
| } |
| DataType::List(Box::new(children.get(0).into())) |
| } |
| ipc::Type::LargeList => { |
| let children = field.children().unwrap(); |
| if children.len() != 1 { |
| panic!("expect a large list to have one child") |
| } |
| DataType::LargeList(Box::new(children.get(0).into())) |
| } |
| ipc::Type::FixedSizeList => { |
| let children = field.children().unwrap(); |
| if children.len() != 1 { |
| panic!("expect a list to have one child") |
| } |
| let fsl = field.type_as_fixed_size_list().unwrap(); |
| DataType::FixedSizeList(Box::new(children.get(0).into()), fsl.listSize()) |
| } |
| ipc::Type::Struct_ => { |
| let mut fields = vec![]; |
| if let Some(children) = field.children() { |
| for i in 0..children.len() { |
| fields.push(children.get(i).into()); |
| } |
| }; |
| |
| DataType::Struct(fields) |
| } |
| ipc::Type::Decimal => { |
| let fsb = field.type_as_decimal().unwrap(); |
| DataType::Decimal(fsb.precision() as usize, fsb.scale() as usize) |
| } |
| t => unimplemented!("Type {:?} not supported", t), |
| } |
| } |
| |
| pub(crate) struct FBFieldType<'b> { |
| pub(crate) type_type: ipc::Type, |
| pub(crate) type_: WIPOffset<UnionWIPOffset>, |
| pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<ipc::Field<'b>>>>>, |
| } |
| |
| /// Create an IPC Field from an Arrow Field |
| pub(crate) fn build_field<'a>( |
| fbb: &mut FlatBufferBuilder<'a>, |
| field: &Field, |
| ) -> WIPOffset<ipc::Field<'a>> { |
| // Optional custom metadata. |
| let mut fb_metadata = None; |
| if let Some(metadata) = field.metadata() { |
| if !metadata.is_empty() { |
| let mut kv_vec = vec![]; |
| for (k, v) in metadata { |
| let kv_args = ipc::KeyValueArgs { |
| key: Some(fbb.create_string(k.as_str())), |
| value: Some(fbb.create_string(v.as_str())), |
| }; |
| let kv_offset = ipc::KeyValue::create(fbb, &kv_args); |
| kv_vec.push(kv_offset); |
| } |
| fb_metadata = Some(fbb.create_vector(&kv_vec)); |
| } |
| }; |
| |
| let fb_field_name = fbb.create_string(field.name().as_str()); |
| let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); |
| |
| let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() { |
| Some(get_fb_dictionary( |
| index_type, |
| field |
| .dict_id() |
| .expect("All Dictionary types have `dict_id`"), |
| field |
| .dict_is_ordered() |
| .expect("All Dictionary types have `dict_is_ordered`"), |
| fbb, |
| )) |
| } else { |
| None |
| }; |
| |
| let mut field_builder = ipc::FieldBuilder::new(fbb); |
| field_builder.add_name(fb_field_name); |
| if let Some(dictionary) = fb_dictionary { |
| field_builder.add_dictionary(dictionary) |
| } |
| field_builder.add_type_type(field_type.type_type); |
| field_builder.add_nullable(field.is_nullable()); |
| match field_type.children { |
| None => {} |
| Some(children) => field_builder.add_children(children), |
| }; |
| field_builder.add_type_(field_type.type_); |
| |
| if let Some(fb_metadata) = fb_metadata { |
| field_builder.add_custom_metadata(fb_metadata); |
| } |
| |
| field_builder.finish() |
| } |
| |
| /// Get the IPC type of a data type |
| pub(crate) fn get_fb_field_type<'a>( |
| data_type: &DataType, |
| is_nullable: bool, |
| fbb: &mut FlatBufferBuilder<'a>, |
| ) -> FBFieldType<'a> { |
| // some IPC implementations expect an empty list for child data, instead of a null value. |
| // An empty field list is thus returned for primitive types |
| let empty_fields: Vec<WIPOffset<ipc::Field>> = vec![]; |
| match data_type { |
| Null => FBFieldType { |
| type_type: ipc::Type::Null, |
| type_: ipc::NullBuilder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| }, |
| Boolean => FBFieldType { |
| type_type: ipc::Type::Bool, |
| type_: ipc::BoolBuilder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| }, |
| UInt8 | UInt16 | UInt32 | UInt64 => { |
| let children = fbb.create_vector(&empty_fields[..]); |
| let mut builder = ipc::IntBuilder::new(fbb); |
| builder.add_is_signed(false); |
| match data_type { |
| UInt8 => builder.add_bitWidth(8), |
| UInt16 => builder.add_bitWidth(16), |
| UInt32 => builder.add_bitWidth(32), |
| UInt64 => builder.add_bitWidth(64), |
| _ => {} |
| }; |
| FBFieldType { |
| type_type: ipc::Type::Int, |
| type_: builder.finish().as_union_value(), |
| children: Some(children), |
| } |
| } |
| Int8 | Int16 | Int32 | Int64 => { |
| let children = fbb.create_vector(&empty_fields[..]); |
| let mut builder = ipc::IntBuilder::new(fbb); |
| builder.add_is_signed(true); |
| match data_type { |
| Int8 => builder.add_bitWidth(8), |
| Int16 => builder.add_bitWidth(16), |
| Int32 => builder.add_bitWidth(32), |
| Int64 => builder.add_bitWidth(64), |
| _ => {} |
| }; |
| FBFieldType { |
| type_type: ipc::Type::Int, |
| type_: builder.finish().as_union_value(), |
| children: Some(children), |
| } |
| } |
| Float16 | Float32 | Float64 => { |
| let children = fbb.create_vector(&empty_fields[..]); |
| let mut builder = ipc::FloatingPointBuilder::new(fbb); |
| match data_type { |
| Float16 => builder.add_precision(ipc::Precision::HALF), |
| Float32 => builder.add_precision(ipc::Precision::SINGLE), |
| Float64 => builder.add_precision(ipc::Precision::DOUBLE), |
| _ => {} |
| }; |
| FBFieldType { |
| type_type: ipc::Type::FloatingPoint, |
| type_: builder.finish().as_union_value(), |
| children: Some(children), |
| } |
| } |
| Binary => FBFieldType { |
| type_type: ipc::Type::Binary, |
| type_: ipc::BinaryBuilder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| }, |
| LargeBinary => FBFieldType { |
| type_type: ipc::Type::LargeBinary, |
| type_: ipc::LargeBinaryBuilder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| }, |
| Utf8 => FBFieldType { |
| type_type: ipc::Type::Utf8, |
| type_: ipc::Utf8Builder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| }, |
| LargeUtf8 => FBFieldType { |
| type_type: ipc::Type::LargeUtf8, |
| type_: ipc::LargeUtf8Builder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| }, |
| FixedSizeBinary(len) => { |
| let mut builder = ipc::FixedSizeBinaryBuilder::new(fbb); |
| builder.add_byteWidth(*len as i32); |
| FBFieldType { |
| type_type: ipc::Type::FixedSizeBinary, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| } |
| } |
| Date32 => { |
| let mut builder = ipc::DateBuilder::new(fbb); |
| builder.add_unit(ipc::DateUnit::DAY); |
| FBFieldType { |
| type_type: ipc::Type::Date, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| } |
| } |
| Date64 => { |
| let mut builder = ipc::DateBuilder::new(fbb); |
| builder.add_unit(ipc::DateUnit::MILLISECOND); |
| FBFieldType { |
| type_type: ipc::Type::Date, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| } |
| } |
| Time32(unit) | Time64(unit) => { |
| let mut builder = ipc::TimeBuilder::new(fbb); |
| match unit { |
| TimeUnit::Second => { |
| builder.add_bitWidth(32); |
| builder.add_unit(ipc::TimeUnit::SECOND); |
| } |
| TimeUnit::Millisecond => { |
| builder.add_bitWidth(32); |
| builder.add_unit(ipc::TimeUnit::MILLISECOND); |
| } |
| TimeUnit::Microsecond => { |
| builder.add_bitWidth(64); |
| builder.add_unit(ipc::TimeUnit::MICROSECOND); |
| } |
| TimeUnit::Nanosecond => { |
| builder.add_bitWidth(64); |
| builder.add_unit(ipc::TimeUnit::NANOSECOND); |
| } |
| } |
| FBFieldType { |
| type_type: ipc::Type::Time, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| } |
| } |
| Timestamp(unit, tz) => { |
| let tz = tz.clone().unwrap_or_else(String::new); |
| let tz_str = fbb.create_string(tz.as_str()); |
| let mut builder = ipc::TimestampBuilder::new(fbb); |
| let time_unit = match unit { |
| TimeUnit::Second => ipc::TimeUnit::SECOND, |
| TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND, |
| TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND, |
| TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND, |
| }; |
| builder.add_unit(time_unit); |
| if !tz.is_empty() { |
| builder.add_timezone(tz_str); |
| } |
| FBFieldType { |
| type_type: ipc::Type::Timestamp, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| } |
| } |
| Interval(unit) => { |
| let mut builder = ipc::IntervalBuilder::new(fbb); |
| let interval_unit = match unit { |
| IntervalUnit::YearMonth => ipc::IntervalUnit::YEAR_MONTH, |
| IntervalUnit::DayTime => ipc::IntervalUnit::DAY_TIME, |
| }; |
| builder.add_unit(interval_unit); |
| FBFieldType { |
| type_type: ipc::Type::Interval, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| } |
| } |
| Duration(unit) => { |
| let mut builder = ipc::DurationBuilder::new(fbb); |
| let time_unit = match unit { |
| TimeUnit::Second => ipc::TimeUnit::SECOND, |
| TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND, |
| TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND, |
| TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND, |
| }; |
| builder.add_unit(time_unit); |
| FBFieldType { |
| type_type: ipc::Type::Duration, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| } |
| } |
| List(ref list_type) => { |
| let child = build_field(fbb, list_type); |
| FBFieldType { |
| type_type: ipc::Type::List, |
| type_: ipc::ListBuilder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&[child])), |
| } |
| } |
| LargeList(ref list_type) => { |
| let child = build_field(fbb, list_type); |
| FBFieldType { |
| type_type: ipc::Type::LargeList, |
| type_: ipc::LargeListBuilder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&[child])), |
| } |
| } |
| FixedSizeList(ref list_type, len) => { |
| let child = build_field(fbb, list_type); |
| let mut builder = ipc::FixedSizeListBuilder::new(fbb); |
| builder.add_listSize(*len as i32); |
| FBFieldType { |
| type_type: ipc::Type::FixedSizeList, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&[child])), |
| } |
| } |
| Struct(fields) => { |
| // struct's fields are children |
| let mut children = vec![]; |
| for field in fields { |
| let inner_types = |
| get_fb_field_type(field.data_type(), field.is_nullable(), fbb); |
| let field_name = fbb.create_string(field.name()); |
| children.push(ipc::Field::create( |
| fbb, |
| &ipc::FieldArgs { |
| name: Some(field_name), |
| nullable: field.is_nullable(), |
| type_type: inner_types.type_type, |
| type_: Some(inner_types.type_), |
| dictionary: None, |
| children: inner_types.children, |
| custom_metadata: None, |
| }, |
| )); |
| } |
| FBFieldType { |
| type_type: ipc::Type::Struct_, |
| type_: ipc::Struct_Builder::new(fbb).finish().as_union_value(), |
| children: Some(fbb.create_vector(&children[..])), |
| } |
| } |
| Dictionary(_, value_type) => { |
| // In this library, the dictionary "type" is a logical construct. Here we |
| // pass through to the value type, as we've already captured the index |
| // type in the DictionaryEncoding metadata in the parent field |
| get_fb_field_type(value_type, is_nullable, fbb) |
| } |
| Decimal(precision, scale) => { |
| let mut builder = ipc::DecimalBuilder::new(fbb); |
| builder.add_precision(*precision as i32); |
| builder.add_scale(*scale as i32); |
| builder.add_bitWidth(128); |
| FBFieldType { |
| type_type: ipc::Type::Decimal, |
| type_: builder.finish().as_union_value(), |
| children: Some(fbb.create_vector(&empty_fields[..])), |
| } |
| } |
| t => unimplemented!("Type {:?} not supported", t), |
| } |
| } |
| |
| /// Create an IPC dictionary encoding |
| pub(crate) fn get_fb_dictionary<'a>( |
| index_type: &DataType, |
| dict_id: i64, |
| dict_is_ordered: bool, |
| fbb: &mut FlatBufferBuilder<'a>, |
| ) -> WIPOffset<ipc::DictionaryEncoding<'a>> { |
| // We assume that the dictionary index type (as an integer) has already been |
| // validated elsewhere, and can safely assume we are dealing with integers |
| let mut index_builder = ipc::IntBuilder::new(fbb); |
| |
| match *index_type { |
| Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true), |
| UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false), |
| _ => {} |
| } |
| |
| match *index_type { |
| Int8 | UInt8 => index_builder.add_bitWidth(8), |
| Int16 | UInt16 => index_builder.add_bitWidth(16), |
| Int32 | UInt32 => index_builder.add_bitWidth(32), |
| Int64 | UInt64 => index_builder.add_bitWidth(64), |
| _ => {} |
| } |
| |
| let index_builder = index_builder.finish(); |
| |
| let mut builder = ipc::DictionaryEncodingBuilder::new(fbb); |
| builder.add_id(dict_id); |
| builder.add_indexType(index_builder); |
| builder.add_isOrdered(dict_is_ordered); |
| |
| builder.finish() |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::datatypes::{DataType, Field, Schema}; |
| |
| #[test] |
| fn convert_schema_round_trip() { |
| let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())] |
| .iter() |
| .cloned() |
| .collect(); |
| let field_md: BTreeMap<String, String> = [("k".to_string(), "v".to_string())] |
| .iter() |
| .cloned() |
| .collect(); |
| let schema = Schema::new_with_metadata( |
| vec![ |
| { |
| let mut f = Field::new("uint8", DataType::UInt8, false); |
| f.set_metadata(Some(field_md)); |
| f |
| }, |
| Field::new("uint16", DataType::UInt16, true), |
| Field::new("uint32", DataType::UInt32, false), |
| Field::new("uint64", DataType::UInt64, true), |
| Field::new("int8", DataType::Int8, true), |
| Field::new("int16", DataType::Int16, false), |
| Field::new("int32", DataType::Int32, true), |
| Field::new("int64", DataType::Int64, false), |
| Field::new("float16", DataType::Float16, true), |
| Field::new("float32", DataType::Float32, false), |
| Field::new("float64", DataType::Float64, true), |
| Field::new("null", DataType::Null, false), |
| Field::new("bool", DataType::Boolean, false), |
| Field::new("date32", DataType::Date32, false), |
| Field::new("date64", DataType::Date64, true), |
| Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true), |
| Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false), |
| Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false), |
| Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true), |
| Field::new( |
| "timestamp[s]", |
| DataType::Timestamp(TimeUnit::Second, None), |
| false, |
| ), |
| Field::new( |
| "timestamp[ms]", |
| DataType::Timestamp(TimeUnit::Millisecond, None), |
| true, |
| ), |
| Field::new( |
| "timestamp[us]", |
| DataType::Timestamp( |
| TimeUnit::Microsecond, |
| Some("Africa/Johannesburg".to_string()), |
| ), |
| false, |
| ), |
| Field::new( |
| "timestamp[ns]", |
| DataType::Timestamp(TimeUnit::Nanosecond, None), |
| true, |
| ), |
| Field::new( |
| "interval[ym]", |
| DataType::Interval(IntervalUnit::YearMonth), |
| true, |
| ), |
| Field::new( |
| "interval[dt]", |
| DataType::Interval(IntervalUnit::DayTime), |
| true, |
| ), |
| Field::new("utf8", DataType::Utf8, false), |
| Field::new("binary", DataType::Binary, false), |
| Field::new( |
| "list[u8]", |
| DataType::List(Box::new(Field::new("item", DataType::UInt8, false))), |
| true, |
| ), |
| Field::new( |
| "list[struct<float32, int32, bool>]", |
| DataType::List(Box::new(Field::new( |
| "struct", |
| DataType::Struct(vec![ |
| Field::new("float32", DataType::UInt8, false), |
| Field::new("int32", DataType::Int32, true), |
| Field::new("bool", DataType::Boolean, true), |
| ]), |
| true, |
| ))), |
| false, |
| ), |
| Field::new( |
| "struct<int64, list[struct<date32, list[struct<>]>]>", |
| DataType::Struct(vec![ |
| Field::new("int64", DataType::Int64, true), |
| Field::new( |
| "list[struct<date32, list[struct<>]>]", |
| DataType::List(Box::new(Field::new( |
| "struct", |
| DataType::Struct(vec![ |
| Field::new("date32", DataType::Date32, true), |
| Field::new( |
| "list[struct<>]", |
| DataType::List(Box::new(Field::new( |
| "struct", |
| DataType::Struct(vec![]), |
| false, |
| ))), |
| false, |
| ), |
| ]), |
| false, |
| ))), |
| false, |
| ), |
| ]), |
| false, |
| ), |
| Field::new("struct<>", DataType::Struct(vec![]), true), |
| Field::new_dict( |
| "dictionary<int32, utf8>", |
| DataType::Dictionary( |
| Box::new(DataType::Int32), |
| Box::new(DataType::Utf8), |
| ), |
| true, |
| 123, |
| true, |
| ), |
| Field::new_dict( |
| "dictionary<uint8, uint32>", |
| DataType::Dictionary( |
| Box::new(DataType::UInt8), |
| Box::new(DataType::UInt32), |
| ), |
| true, |
| 123, |
| true, |
| ), |
| Field::new("decimal<usize, usize>", DataType::Decimal(10, 6), false), |
| ], |
| md, |
| ); |
| |
| let fb = schema_to_fb(&schema); |
| |
| // read back fields |
| let ipc = ipc::root_as_schema(fb.finished_data()).unwrap(); |
| let schema2 = fb_to_schema(ipc); |
| assert_eq!(schema, schema2); |
| } |
| |
| #[test] |
| fn schema_from_bytes() { |
| // bytes of a schema generated from python (0.14.0), saved as an `ipc::Message`. |
| // the schema is: Field("field1", DataType::UInt32, false) |
| let bytes: Vec<u8> = vec![ |
| 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 3, 0, |
| 12, 0, 0, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, |
| 0, 0, 0, 16, 0, 20, 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, |
| 0, 0, 2, 32, 0, 0, 0, 20, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 8, 0, |
| 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, |
| 0, 0, 0, 0, |
| ]; |
| let ipc = ipc::root_as_message(&bytes[..]).unwrap(); |
| let schema = ipc.header_as_schema().unwrap(); |
| |
| // a message generated from Rust, same as the Python one |
| let bytes: Vec<u8> = vec![ |
| 16, 0, 0, 0, 0, 0, 10, 0, 14, 0, 12, 0, 11, 0, 4, 0, 10, 0, 0, 0, 20, 0, 0, |
| 0, 0, 0, 0, 1, 3, 0, 10, 0, 12, 0, 0, 0, 8, 0, 4, 0, 10, 0, 0, 0, 8, 0, 0, 0, |
| 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 16, 0, 0, 0, 12, 0, 18, 0, 12, 0, 0, 0, |
| 11, 0, 4, 0, 12, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 2, 20, 0, 0, 0, 0, 0, 6, 0, |
| 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, |
| 0, 0, |
| ]; |
| let ipc2 = ipc::root_as_message(&bytes[..]).unwrap(); |
| let schema2 = ipc.header_as_schema().unwrap(); |
| |
| assert_eq!(schema, schema2); |
| assert_eq!(ipc.version(), ipc2.version()); |
| assert_eq!(ipc.header_type(), ipc2.header_type()); |
| assert_eq!(ipc.bodyLength(), ipc2.bodyLength()); |
| assert!(ipc.custom_metadata().is_none()); |
| assert!(ipc2.custom_metadata().is_none()); |
| } |
| } |