| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Data types that connect Parquet physical types with their Rust-specific |
| //! representations. |
| use std::cmp::Ordering; |
| use std::fmt; |
| use std::mem; |
| use std::ops::{Deref, DerefMut}; |
| use std::str::from_utf8; |
| |
| use byteorder::{BigEndian, ByteOrder}; |
| |
| use crate::basic::Type; |
| use crate::column::reader::{ColumnReader, ColumnReaderImpl}; |
| use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; |
| use crate::errors::{ParquetError, Result}; |
| use crate::util::{ |
| bit_util::{from_ne_slice, FromBytes}, |
| memory::{ByteBuffer, ByteBufferPtr}, |
| }; |
| |
| /// Rust representation for logical type INT96, value is backed by an array of `u32`. |
| /// The type only takes 12 bytes, without extra padding. |
| #[derive(Clone, Debug, PartialOrd)] |
| pub struct Int96 { |
| value: Option<[u32; 3]>, |
| } |
| |
| impl Int96 { |
| /// Creates new INT96 type struct with no data set. |
| pub fn new() -> Self { |
| Self { value: None } |
| } |
| |
| /// Returns underlying data as slice of [`u32`]. |
| #[inline] |
| pub fn data(&self) -> &[u32] { |
| self.value |
| .as_ref() |
| .expect("set_data should have been called") |
| } |
| |
| /// Sets data for this INT96 type. |
| #[inline] |
| pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) { |
| self.value = Some([elem0, elem1, elem2]); |
| } |
| |
| /// Converts this INT96 into an i64 representing the number of MILLISECONDS since Epoch |
| pub fn to_i64(&self) -> i64 { |
| const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; |
| const SECONDS_PER_DAY: i64 = 86_400; |
| const MILLIS_PER_SECOND: i64 = 1_000; |
| |
| let day = self.data()[2] as i64; |
| let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as i64; |
| let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; |
| |
| seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000 |
| } |
| } |
| |
| impl Default for Int96 { |
| fn default() -> Self { |
| Self { value: None } |
| } |
| } |
| |
| impl PartialEq for Int96 { |
| fn eq(&self, other: &Int96) -> bool { |
| match (&self.value, &other.value) { |
| (Some(v1), Some(v2)) => v1 == v2, |
| (None, None) => true, |
| _ => false, |
| } |
| } |
| } |
| |
| impl From<Vec<u32>> for Int96 { |
| fn from(buf: Vec<u32>) -> Self { |
| assert_eq!(buf.len(), 3); |
| let mut result = Self::new(); |
| result.set_data(buf[0], buf[1], buf[2]); |
| result |
| } |
| } |
| |
| impl fmt::Display for Int96 { |
| #[cold] |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| write!(f, "{:?}", self.data()) |
| } |
| } |
| |
| /// Rust representation for BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY Parquet physical types. |
| /// Value is backed by a byte buffer. |
| #[derive(Clone, Debug)] |
| pub struct ByteArray { |
| data: Option<ByteBufferPtr>, |
| } |
| |
| impl PartialOrd for ByteArray { |
| fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> { |
| if self.data.is_some() && other.data.is_some() { |
| match self.len().cmp(&other.len()) { |
| Ordering::Greater => Some(Ordering::Greater), |
| Ordering::Less => Some(Ordering::Less), |
| Ordering::Equal => { |
| for (v1, v2) in self.data().iter().zip(other.data().iter()) { |
| match v1.cmp(v2) { |
| Ordering::Greater => return Some(Ordering::Greater), |
| Ordering::Less => return Some(Ordering::Less), |
| _ => {} |
| } |
| } |
| Some(Ordering::Equal) |
| } |
| } |
| } else { |
| None |
| } |
| } |
| } |
| |
| impl ByteArray { |
| /// Creates new byte array with no data set. |
| #[inline] |
| pub fn new() -> Self { |
| ByteArray { data: None } |
| } |
| |
| /// Gets length of the underlying byte buffer. |
| #[inline] |
| pub fn len(&self) -> usize { |
| assert!(self.data.is_some()); |
| self.data.as_ref().unwrap().len() |
| } |
| |
| /// Checks if the underlying buffer is empty. |
| #[inline] |
| pub fn is_empty(&self) -> bool { |
| self.len() == 0 |
| } |
| |
| /// Returns slice of data. |
| #[inline] |
| pub fn data(&self) -> &[u8] { |
| self.data |
| .as_ref() |
| .expect("set_data should have been called") |
| .as_ref() |
| } |
| |
| /// Set data from another byte buffer. |
| #[inline] |
| pub fn set_data(&mut self, data: ByteBufferPtr) { |
| self.data = Some(data); |
| } |
| |
| /// Returns `ByteArray` instance with slice of values for a data. |
| #[inline] |
| pub fn slice(&self, start: usize, len: usize) -> Self { |
| Self::from( |
| self.data |
| .as_ref() |
| .expect("set_data should have been called") |
| .range(start, len), |
| ) |
| } |
| |
| pub fn as_utf8(&self) -> Result<&str> { |
| self.data |
| .as_ref() |
| .map(|ptr| ptr.as_ref()) |
| .ok_or_else(|| general_err!("Can't convert empty byte array to utf8")) |
| .and_then(|bytes| from_utf8(bytes).map_err(|e| e.into())) |
| } |
| } |
| |
| impl From<Vec<u8>> for ByteArray { |
| fn from(buf: Vec<u8>) -> ByteArray { |
| Self { |
| data: Some(ByteBufferPtr::new(buf)), |
| } |
| } |
| } |
| |
| impl<'a> From<&'a str> for ByteArray { |
| fn from(s: &'a str) -> ByteArray { |
| let mut v = Vec::new(); |
| v.extend_from_slice(s.as_bytes()); |
| Self { |
| data: Some(ByteBufferPtr::new(v)), |
| } |
| } |
| } |
| |
| impl From<ByteBufferPtr> for ByteArray { |
| fn from(ptr: ByteBufferPtr) -> ByteArray { |
| Self { data: Some(ptr) } |
| } |
| } |
| |
| impl From<ByteBuffer> for ByteArray { |
| fn from(mut buf: ByteBuffer) -> ByteArray { |
| Self { |
| data: Some(buf.consume()), |
| } |
| } |
| } |
| |
| impl Default for ByteArray { |
| fn default() -> Self { |
| ByteArray { data: None } |
| } |
| } |
| |
| impl PartialEq for ByteArray { |
| fn eq(&self, other: &ByteArray) -> bool { |
| match (&self.data, &other.data) { |
| (Some(d1), Some(d2)) => d1.as_ref() == d2.as_ref(), |
| (None, None) => true, |
| _ => false, |
| } |
| } |
| } |
| |
| impl fmt::Display for ByteArray { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| write!(f, "{:?}", self.data()) |
| } |
| } |
| |
| /// Wrapper type for performance reasons, this represents `FIXED_LEN_BYTE_ARRAY` but in all other |
| /// considerations behaves the same as `ByteArray` |
| /// |
| /// # Performance notes: |
| /// This type is a little unfortunate, without it the compiler generates code that takes quite a |
| /// big hit on the CPU pipeline. Essentially the previous version stalls awaiting the result of |
| /// `T::get_physical_type() == Type::FIXED_LEN_BYTE_ARRAY`. |
| /// |
| /// Its debatable if this is wanted, it is out of spec for what parquet documents as its base |
| /// types, although there are code paths in the Rust (and potentially the C++) versions that |
| /// warrant this. |
| /// |
| /// With this wrapper type the compiler generates more targetted code paths matching the higher |
| /// level logical types, removing the data-hazard from all decoding and encoding paths. |
| #[repr(transparent)] |
| #[derive(Clone, Debug, Default)] |
| pub struct FixedLenByteArray(ByteArray); |
| |
| impl PartialEq for FixedLenByteArray { |
| fn eq(&self, other: &FixedLenByteArray) -> bool { |
| self.0.eq(&other.0) |
| } |
| } |
| |
| impl PartialEq<ByteArray> for FixedLenByteArray { |
| fn eq(&self, other: &ByteArray) -> bool { |
| self.0.eq(other) |
| } |
| } |
| |
| impl PartialEq<FixedLenByteArray> for ByteArray { |
| fn eq(&self, other: &FixedLenByteArray) -> bool { |
| self.eq(&other.0) |
| } |
| } |
| |
| impl fmt::Display for FixedLenByteArray { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| self.0.fmt(f) |
| } |
| } |
| |
| impl PartialOrd for FixedLenByteArray { |
| fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> { |
| self.0.partial_cmp(&other.0) |
| } |
| } |
| |
| impl PartialOrd<FixedLenByteArray> for ByteArray { |
| fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> { |
| self.partial_cmp(&other.0) |
| } |
| } |
| |
| impl PartialOrd<ByteArray> for FixedLenByteArray { |
| fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> { |
| self.0.partial_cmp(other) |
| } |
| } |
| |
| impl Deref for FixedLenByteArray { |
| type Target = ByteArray; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.0 |
| } |
| } |
| |
| impl DerefMut for FixedLenByteArray { |
| fn deref_mut(&mut self) -> &mut Self::Target { |
| &mut self.0 |
| } |
| } |
| |
| impl From<ByteArray> for FixedLenByteArray { |
| fn from(other: ByteArray) -> Self { |
| Self(other) |
| } |
| } |
| |
| impl From<FixedLenByteArray> for ByteArray { |
| fn from(other: FixedLenByteArray) -> Self { |
| other.0 |
| } |
| } |
| |
| /// Rust representation for Decimal values. |
| /// |
| /// This is not a representation of Parquet physical type, but rather a wrapper for |
| /// DECIMAL logical type, and serves as container for raw parts of decimal values: |
| /// unscaled value in bytes, precision and scale. |
| #[derive(Clone, Debug)] |
| pub enum Decimal { |
| /// Decimal backed by `i32`. |
| Int32 { |
| value: [u8; 4], |
| precision: i32, |
| scale: i32, |
| }, |
| /// Decimal backed by `i64`. |
| Int64 { |
| value: [u8; 8], |
| precision: i32, |
| scale: i32, |
| }, |
| /// Decimal backed by byte array. |
| Bytes { |
| value: ByteArray, |
| precision: i32, |
| scale: i32, |
| }, |
| } |
| |
| impl Decimal { |
| /// Creates new decimal value from `i32`. |
| pub fn from_i32(value: i32, precision: i32, scale: i32) -> Self { |
| let mut bytes = [0; 4]; |
| BigEndian::write_i32(&mut bytes, value); |
| Decimal::Int32 { |
| value: bytes, |
| precision, |
| scale, |
| } |
| } |
| |
| /// Creates new decimal value from `i64`. |
| pub fn from_i64(value: i64, precision: i32, scale: i32) -> Self { |
| let mut bytes = [0; 8]; |
| BigEndian::write_i64(&mut bytes, value); |
| Decimal::Int64 { |
| value: bytes, |
| precision, |
| scale, |
| } |
| } |
| |
| /// Creates new decimal value from `ByteArray`. |
| pub fn from_bytes(value: ByteArray, precision: i32, scale: i32) -> Self { |
| Decimal::Bytes { |
| value, |
| precision, |
| scale, |
| } |
| } |
| |
| /// Returns bytes of unscaled value. |
| pub fn data(&self) -> &[u8] { |
| match *self { |
| Decimal::Int32 { ref value, .. } => value, |
| Decimal::Int64 { ref value, .. } => value, |
| Decimal::Bytes { ref value, .. } => value.data(), |
| } |
| } |
| |
| /// Returns decimal precision. |
| pub fn precision(&self) -> i32 { |
| match *self { |
| Decimal::Int32 { precision, .. } => precision, |
| Decimal::Int64 { precision, .. } => precision, |
| Decimal::Bytes { precision, .. } => precision, |
| } |
| } |
| |
| /// Returns decimal scale. |
| pub fn scale(&self) -> i32 { |
| match *self { |
| Decimal::Int32 { scale, .. } => scale, |
| Decimal::Int64 { scale, .. } => scale, |
| Decimal::Bytes { scale, .. } => scale, |
| } |
| } |
| } |
| |
| impl Default for Decimal { |
| fn default() -> Self { |
| Self::from_i32(0, 0, 0) |
| } |
| } |
| |
| impl PartialEq for Decimal { |
| fn eq(&self, other: &Decimal) -> bool { |
| self.precision() == other.precision() |
| && self.scale() == other.scale() |
| && self.data() == other.data() |
| } |
| } |
| |
| /// Converts an instance of data type to a slice of bytes as `u8`. |
| pub trait AsBytes { |
| /// Returns slice of bytes for this data type. |
| fn as_bytes(&self) -> &[u8]; |
| } |
| |
| /// Converts an slice of a data type to a slice of bytes. |
| pub trait SliceAsBytes: Sized { |
| /// Returns slice of bytes for a slice of this data type. |
| fn slice_as_bytes(self_: &[Self]) -> &[u8]; |
| /// Return the internal representation as a mutable slice |
| /// |
| /// # Safety |
| /// If modified you are _required_ to ensure the internal representation |
| /// is valid and correct for the actual raw data |
| unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8]; |
| } |
| |
| impl AsBytes for [u8] { |
| fn as_bytes(&self) -> &[u8] { |
| self |
| } |
| } |
| |
| macro_rules! gen_as_bytes { |
| ($source_ty:ident) => { |
| impl AsBytes for $source_ty { |
| #[allow(clippy::size_of_in_element_count)] |
| fn as_bytes(&self) -> &[u8] { |
| unsafe { |
| std::slice::from_raw_parts( |
| self as *const $source_ty as *const u8, |
| std::mem::size_of::<$source_ty>(), |
| ) |
| } |
| } |
| } |
| |
| impl SliceAsBytes for $source_ty { |
| #[inline] |
| #[allow(clippy::size_of_in_element_count)] |
| fn slice_as_bytes(self_: &[Self]) -> &[u8] { |
| unsafe { |
| std::slice::from_raw_parts( |
| self_.as_ptr() as *const u8, |
| std::mem::size_of::<$source_ty>() * self_.len(), |
| ) |
| } |
| } |
| |
| #[inline] |
| #[allow(clippy::size_of_in_element_count)] |
| unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] { |
| std::slice::from_raw_parts_mut( |
| self_.as_mut_ptr() as *mut u8, |
| std::mem::size_of::<$source_ty>() * self_.len(), |
| ) |
| } |
| } |
| }; |
| } |
| |
| gen_as_bytes!(i8); |
| gen_as_bytes!(i16); |
| gen_as_bytes!(i32); |
| gen_as_bytes!(i64); |
| gen_as_bytes!(u8); |
| gen_as_bytes!(u16); |
| gen_as_bytes!(u32); |
| gen_as_bytes!(u64); |
| gen_as_bytes!(f32); |
| gen_as_bytes!(f64); |
| |
| macro_rules! unimplemented_slice_as_bytes { |
| ($ty: ty) => { |
| impl SliceAsBytes for $ty { |
| fn slice_as_bytes(_self: &[Self]) -> &[u8] { |
| unimplemented!() |
| } |
| |
| unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] { |
| unimplemented!() |
| } |
| } |
| }; |
| } |
| |
| // TODO - Can Int96 and bool be implemented in these terms? |
| unimplemented_slice_as_bytes!(Int96); |
| unimplemented_slice_as_bytes!(bool); |
| unimplemented_slice_as_bytes!(ByteArray); |
| unimplemented_slice_as_bytes!(FixedLenByteArray); |
| |
| impl AsBytes for bool { |
| fn as_bytes(&self) -> &[u8] { |
| unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) } |
| } |
| } |
| |
| impl AsBytes for Int96 { |
| fn as_bytes(&self) -> &[u8] { |
| unsafe { |
| std::slice::from_raw_parts(self.data() as *const [u32] as *const u8, 12) |
| } |
| } |
| } |
| |
| impl AsBytes for ByteArray { |
| fn as_bytes(&self) -> &[u8] { |
| self.data() |
| } |
| } |
| |
| impl AsBytes for FixedLenByteArray { |
| fn as_bytes(&self) -> &[u8] { |
| self.data() |
| } |
| } |
| |
| impl AsBytes for Decimal { |
| fn as_bytes(&self) -> &[u8] { |
| self.data() |
| } |
| } |
| |
| impl AsBytes for Vec<u8> { |
| fn as_bytes(&self) -> &[u8] { |
| self.as_slice() |
| } |
| } |
| |
| impl<'a> AsBytes for &'a str { |
| fn as_bytes(&self) -> &[u8] { |
| (self as &str).as_bytes() |
| } |
| } |
| |
| impl AsBytes for str { |
| fn as_bytes(&self) -> &[u8] { |
| (self as &str).as_bytes() |
| } |
| } |
| |
| pub(crate) mod private { |
| use crate::encodings::decoding::PlainDecoderDetails; |
| use crate::util::bit_util::{BitReader, BitWriter}; |
| use crate::util::memory::ByteBufferPtr; |
| |
| use byteorder::ByteOrder; |
| use std::convert::TryInto; |
| |
| use super::{ParquetError, Result, SliceAsBytes}; |
| |
| pub type BitIndex = u64; |
| |
| /// Sealed trait to start to remove specialisation from implementations |
| /// |
| /// This is done to force the associated value type to be unimplementable outside of this |
| /// crate, and thus hint to the type system (and end user) traits are public for the contract |
| /// and not for extension. |
| pub trait ParquetValueType: |
| std::cmp::PartialEq |
| + std::fmt::Debug |
| + std::fmt::Display |
| + std::default::Default |
| + std::clone::Clone |
| + super::AsBytes |
| + super::FromBytes |
| + super::SliceAsBytes |
| + PartialOrd |
| { |
| /// Encode the value directly from a higher level encoder |
| fn encode<W: std::io::Write>( |
| values: &[Self], |
| writer: &mut W, |
| bit_writer: &mut BitWriter, |
| ) -> Result<()>; |
| |
| /// Establish the data that will be decoded in a buffer |
| fn set_data( |
| decoder: &mut PlainDecoderDetails, |
| data: ByteBufferPtr, |
| num_values: usize, |
| ); |
| |
| /// Decode the value from a given buffer for a higher level decoder |
| fn decode( |
| buffer: &mut [Self], |
| decoder: &mut PlainDecoderDetails, |
| ) -> Result<usize>; |
| |
| /// Return the encoded size for a type |
| fn dict_encoding_size(&self) -> (usize, usize) { |
| (std::mem::size_of::<Self>(), 1) |
| } |
| |
| /// Return the value as i64 if possible |
| /// |
| /// This is essentially the same as `std::convert::TryInto<i64>` but can |
| /// implemented for `f32` and `f64`, types that would fail orphan rules |
| fn as_i64(&self) -> Result<i64> { |
| Err(general_err!("Type cannot be converted to i64")) |
| } |
| |
| /// Return the value as u64 if possible |
| /// |
| /// This is essentially the same as `std::convert::TryInto<u64>` but can |
| /// implemented for `f32` and `f64`, types that would fail orphan rules |
| fn as_u64(&self) -> Result<u64> { |
| self.as_i64() |
| .map_err(|_| general_err!("Type cannot be converted to u64")) |
| .map(|x| x as u64) |
| } |
| |
| /// Return the value as an Any to allow for downcasts without transmutation |
| fn as_any(&self) -> &dyn std::any::Any; |
| |
| /// Return the value as an mutable Any to allow for downcasts without transmutation |
| fn as_mut_any(&mut self) -> &mut dyn std::any::Any; |
| } |
| |
| impl ParquetValueType for bool { |
| #[inline] |
| fn encode<W: std::io::Write>( |
| values: &[Self], |
| _: &mut W, |
| bit_writer: &mut BitWriter, |
| ) -> Result<()> { |
| if bit_writer.bytes_written() + values.len() / 8 >= bit_writer.capacity() { |
| bit_writer.extend(256); |
| } |
| for value in values { |
| if !bit_writer.put_value(*value as u64, 1) { |
| return Err(ParquetError::EOF( |
| "unable to put boolean value".to_string(), |
| )); |
| } |
| } |
| Ok(()) |
| } |
| |
| #[inline] |
| fn set_data( |
| decoder: &mut PlainDecoderDetails, |
| data: ByteBufferPtr, |
| num_values: usize, |
| ) { |
| decoder.bit_reader.replace(BitReader::new(data)); |
| decoder.num_values = num_values; |
| } |
| |
| #[inline] |
| fn decode( |
| buffer: &mut [Self], |
| decoder: &mut PlainDecoderDetails, |
| ) -> Result<usize> { |
| let bit_reader = decoder.bit_reader.as_mut().unwrap(); |
| let num_values = std::cmp::min(buffer.len(), decoder.num_values); |
| let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1); |
| decoder.num_values -= values_read; |
| Ok(values_read) |
| } |
| |
| #[inline] |
| fn as_i64(&self) -> Result<i64> { |
| Ok(*self as i64) |
| } |
| |
| #[inline] |
| fn as_any(&self) -> &dyn std::any::Any { |
| self |
| } |
| |
| #[inline] |
| fn as_mut_any(&mut self) -> &mut dyn std::any::Any { |
| self |
| } |
| } |
| |
| /// Hopelessly unsafe function that emulates `num::as_ne_bytes` |
| /// |
| /// It is not recommended to use this outside of this private module as, while it |
| /// _should_ work for primitive values, it is little better than a transmutation |
| /// and can act as a backdoor into mis-interpreting types as arbitary byte slices |
| #[inline] |
| fn as_raw<'a, T>(value: *const T) -> &'a [u8] { |
| unsafe { |
| let value = value as *const u8; |
| std::slice::from_raw_parts(value, std::mem::size_of::<T>()) |
| } |
| } |
| |
| macro_rules! impl_from_raw { |
| ($ty: ty, $self: ident => $as_i64: block) => { |
| impl ParquetValueType for $ty { |
| #[inline] |
| fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> { |
| let raw = unsafe { |
| std::slice::from_raw_parts( |
| values.as_ptr() as *const u8, |
| std::mem::size_of::<$ty>() * values.len(), |
| ) |
| }; |
| writer.write_all(raw)?; |
| |
| Ok(()) |
| } |
| |
| #[inline] |
| fn set_data(decoder: &mut PlainDecoderDetails, data: ByteBufferPtr, num_values: usize) { |
| decoder.data.replace(data); |
| decoder.start = 0; |
| decoder.num_values = num_values; |
| } |
| |
| #[inline] |
| fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> { |
| let data = decoder.data.as_ref().expect("set_data should have been called"); |
| let num_values = std::cmp::min(buffer.len(), decoder.num_values); |
| let bytes_left = data.len() - decoder.start; |
| let bytes_to_decode = std::mem::size_of::<Self>() * num_values; |
| |
| if bytes_left < bytes_to_decode { |
| return Err(eof_err!("Not enough bytes to decode")); |
| } |
| |
| // SAFETY: Raw types should be as per the standard rust bit-vectors |
| unsafe { |
| let raw_buffer = &mut Self::slice_as_bytes_mut(buffer)[..bytes_to_decode]; |
| raw_buffer.copy_from_slice(data.range(decoder.start, bytes_to_decode).as_ref()); |
| }; |
| decoder.start += bytes_to_decode; |
| decoder.num_values -= num_values; |
| |
| Ok(num_values) |
| } |
| |
| #[inline] |
| fn as_i64(&$self) -> Result<i64> { |
| $as_i64 |
| } |
| |
| #[inline] |
| fn as_any(&self) -> &dyn std::any::Any { |
| self |
| } |
| |
| #[inline] |
| fn as_mut_any(&mut self) -> &mut dyn std::any::Any { |
| self |
| } |
| } |
| } |
| } |
| |
| impl_from_raw!(i32, self => { Ok(*self as i64) }); |
| impl_from_raw!(i64, self => { Ok(*self) }); |
| impl_from_raw!(f32, self => { Err(general_err!("Type cannot be converted to i64")) }); |
| impl_from_raw!(f64, self => { Err(general_err!("Type cannot be converted to i64")) }); |
| |
| impl ParquetValueType for super::Int96 { |
| #[inline] |
| fn encode<W: std::io::Write>( |
| values: &[Self], |
| writer: &mut W, |
| _: &mut BitWriter, |
| ) -> Result<()> { |
| for value in values { |
| let raw = unsafe { |
| std::slice::from_raw_parts( |
| value.data() as *const [u32] as *const u8, |
| 12, |
| ) |
| }; |
| writer.write_all(raw)?; |
| } |
| Ok(()) |
| } |
| |
| #[inline] |
| fn set_data( |
| decoder: &mut PlainDecoderDetails, |
| data: ByteBufferPtr, |
| num_values: usize, |
| ) { |
| decoder.data.replace(data); |
| decoder.start = 0; |
| decoder.num_values = num_values; |
| } |
| |
| #[inline] |
| fn decode( |
| buffer: &mut [Self], |
| decoder: &mut PlainDecoderDetails, |
| ) -> Result<usize> { |
| // TODO - Remove the duplication between this and the general slice method |
| let data = decoder |
| .data |
| .as_ref() |
| .expect("set_data should have been called"); |
| let num_values = std::cmp::min(buffer.len(), decoder.num_values); |
| let bytes_left = data.len() - decoder.start; |
| let bytes_to_decode = 12 * num_values; |
| |
| if bytes_left < bytes_to_decode { |
| return Err(eof_err!("Not enough bytes to decode")); |
| } |
| |
| let data_range = data.range(decoder.start, bytes_to_decode); |
| let bytes: &[u8] = data_range.data(); |
| decoder.start += bytes_to_decode; |
| |
| let mut pos = 0; // position in byte array |
| for i in 0..num_values { |
| let elem0 = byteorder::LittleEndian::read_u32(&bytes[pos..pos + 4]); |
| let elem1 = byteorder::LittleEndian::read_u32(&bytes[pos + 4..pos + 8]); |
| let elem2 = byteorder::LittleEndian::read_u32(&bytes[pos + 8..pos + 12]); |
| |
| buffer[i] |
| .as_mut_any() |
| .downcast_mut::<Self>() |
| .unwrap() |
| .set_data(elem0, elem1, elem2); |
| |
| pos += 12; |
| } |
| decoder.num_values -= num_values; |
| |
| Ok(num_values) |
| } |
| |
| #[inline] |
| fn as_any(&self) -> &dyn std::any::Any { |
| self |
| } |
| |
| #[inline] |
| fn as_mut_any(&mut self) -> &mut dyn std::any::Any { |
| self |
| } |
| } |
| |
| // TODO - Why does macro importing fail? |
| /// Reads `$size` of bytes from `$src`, and reinterprets them as type `$ty`, in |
| /// little-endian order. `$ty` must implement the `Default` trait. Otherwise this won't |
| /// compile. |
| /// This is copied and modified from byteorder crate. |
| macro_rules! read_num_bytes { |
| ($ty:ty, $size:expr, $src:expr) => {{ |
| assert!($size <= $src.len()); |
| let mut buffer = |
| <$ty as $crate::util::bit_util::FromBytes>::Buffer::default(); |
| buffer.as_mut()[..$size].copy_from_slice(&$src[..$size]); |
| <$ty>::from_ne_bytes(buffer) |
| }}; |
| } |
| |
| impl ParquetValueType for super::ByteArray { |
| #[inline] |
| fn encode<W: std::io::Write>( |
| values: &[Self], |
| writer: &mut W, |
| _: &mut BitWriter, |
| ) -> Result<()> { |
| for value in values { |
| let len: u32 = value.len().try_into().unwrap(); |
| writer.write_all(&len.to_ne_bytes())?; |
| let raw = value.data(); |
| writer.write_all(raw)?; |
| } |
| Ok(()) |
| } |
| |
| #[inline] |
| fn set_data( |
| decoder: &mut PlainDecoderDetails, |
| data: ByteBufferPtr, |
| num_values: usize, |
| ) { |
| decoder.data.replace(data); |
| decoder.start = 0; |
| decoder.num_values = num_values; |
| } |
| |
| #[inline] |
| fn decode( |
| buffer: &mut [Self], |
| decoder: &mut PlainDecoderDetails, |
| ) -> Result<usize> { |
| let data = decoder |
| .data |
| .as_mut() |
| .expect("set_data should have been called"); |
| let num_values = std::cmp::min(buffer.len(), decoder.num_values); |
| for i in 0..num_values { |
| let len: usize = |
| read_num_bytes!(u32, 4, data.start_from(decoder.start).as_ref()) |
| as usize; |
| decoder.start += std::mem::size_of::<u32>(); |
| |
| if data.len() < decoder.start + len { |
| return Err(eof_err!("Not enough bytes to decode")); |
| } |
| |
| let val: &mut Self = buffer[i].as_mut_any().downcast_mut().unwrap(); |
| |
| val.set_data(data.range(decoder.start, len)); |
| decoder.start += len; |
| } |
| decoder.num_values -= num_values; |
| |
| Ok(num_values) |
| } |
| |
| #[inline] |
| fn dict_encoding_size(&self) -> (usize, usize) { |
| (std::mem::size_of::<u32>(), self.len()) |
| } |
| |
| #[inline] |
| fn as_any(&self) -> &dyn std::any::Any { |
| self |
| } |
| |
| #[inline] |
| fn as_mut_any(&mut self) -> &mut dyn std::any::Any { |
| self |
| } |
| } |
| |
| impl ParquetValueType for super::FixedLenByteArray { |
| #[inline] |
| fn encode<W: std::io::Write>( |
| values: &[Self], |
| writer: &mut W, |
| _: &mut BitWriter, |
| ) -> Result<()> { |
| for value in values { |
| let raw = value.data(); |
| writer.write_all(raw)?; |
| } |
| Ok(()) |
| } |
| |
| #[inline] |
| fn set_data( |
| decoder: &mut PlainDecoderDetails, |
| data: ByteBufferPtr, |
| num_values: usize, |
| ) { |
| decoder.data.replace(data); |
| decoder.start = 0; |
| decoder.num_values = num_values; |
| } |
| |
| #[inline] |
| fn decode( |
| buffer: &mut [Self], |
| decoder: &mut PlainDecoderDetails, |
| ) -> Result<usize> { |
| assert!(decoder.type_length > 0); |
| |
| let data = decoder |
| .data |
| .as_mut() |
| .expect("set_data should have been called"); |
| let num_values = std::cmp::min(buffer.len(), decoder.num_values); |
| for i in 0..num_values { |
| let len = decoder.type_length as usize; |
| |
| if data.len() < decoder.start + len { |
| return Err(eof_err!("Not enough bytes to decode")); |
| } |
| |
| let val: &mut Self = buffer[i].as_mut_any().downcast_mut().unwrap(); |
| |
| val.set_data(data.range(decoder.start, len)); |
| decoder.start += len; |
| } |
| decoder.num_values -= num_values; |
| |
| Ok(num_values) |
| } |
| |
| #[inline] |
| fn dict_encoding_size(&self) -> (usize, usize) { |
| (std::mem::size_of::<u32>(), self.len()) |
| } |
| |
| #[inline] |
| fn as_any(&self) -> &dyn std::any::Any { |
| self |
| } |
| |
| #[inline] |
| fn as_mut_any(&mut self) -> &mut dyn std::any::Any { |
| self |
| } |
| } |
| } |
| |
| /// Contains the Parquet physical type information as well as the Rust primitive type |
| /// presentation. |
| pub trait DataType: 'static { |
| type T: private::ParquetValueType; |
| |
| /// Returns Parquet physical type. |
| fn get_physical_type() -> Type; |
| |
| /// Returns size in bytes for Rust representation of the physical type. |
| fn get_type_size() -> usize; |
| |
| fn get_column_reader(column_writer: ColumnReader) -> Option<ColumnReaderImpl<Self>> |
| where |
| Self: Sized; |
| |
| fn get_column_writer(column_writer: ColumnWriter) -> Option<ColumnWriterImpl<Self>> |
| where |
| Self: Sized; |
| |
| fn get_column_writer_ref( |
| column_writer: &ColumnWriter, |
| ) -> Option<&ColumnWriterImpl<Self>> |
| where |
| Self: Sized; |
| |
| fn get_column_writer_mut( |
| column_writer: &mut ColumnWriter, |
| ) -> Option<&mut ColumnWriterImpl<Self>> |
| where |
| Self: Sized; |
| } |
| |
| // Workaround bug in specialization |
| pub trait SliceAsBytesDataType: DataType |
| where |
| Self::T: SliceAsBytes, |
| { |
| } |
| |
| impl<T> SliceAsBytesDataType for T |
| where |
| T: DataType, |
| <T as DataType>::T: SliceAsBytes, |
| { |
| } |
| |
| macro_rules! make_type { |
| ($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => { |
| #[derive(Clone)] |
| pub struct $name {} |
| |
| impl DataType for $name { |
| type T = $native_ty; |
| |
| fn get_physical_type() -> Type { |
| $physical_ty |
| } |
| |
| fn get_type_size() -> usize { |
| $size |
| } |
| |
| fn get_column_reader( |
| column_writer: ColumnReader, |
| ) -> Option<ColumnReaderImpl<Self>> { |
| match column_writer { |
| ColumnReader::$reader_ident(w) => Some(w), |
| _ => None, |
| } |
| } |
| |
| fn get_column_writer( |
| column_writer: ColumnWriter, |
| ) -> Option<ColumnWriterImpl<Self>> { |
| match column_writer { |
| ColumnWriter::$writer_ident(w) => Some(w), |
| _ => None, |
| } |
| } |
| |
| fn get_column_writer_ref( |
| column_writer: &ColumnWriter, |
| ) -> Option<&ColumnWriterImpl<Self>> { |
| match column_writer { |
| ColumnWriter::$writer_ident(w) => Some(w), |
| _ => None, |
| } |
| } |
| |
| fn get_column_writer_mut( |
| column_writer: &mut ColumnWriter, |
| ) -> Option<&mut ColumnWriterImpl<Self>> { |
| match column_writer { |
| ColumnWriter::$writer_ident(w) => Some(w), |
| _ => None, |
| } |
| } |
| } |
| }; |
| } |
| |
| // Generate struct definitions for all physical types |
| |
| make_type!( |
| BoolType, |
| Type::BOOLEAN, |
| BoolColumnReader, |
| BoolColumnWriter, |
| bool, |
| 1 |
| ); |
| make_type!( |
| Int32Type, |
| Type::INT32, |
| Int32ColumnReader, |
| Int32ColumnWriter, |
| i32, |
| 4 |
| ); |
| make_type!( |
| Int64Type, |
| Type::INT64, |
| Int64ColumnReader, |
| Int64ColumnWriter, |
| i64, |
| 8 |
| ); |
| make_type!( |
| Int96Type, |
| Type::INT96, |
| Int96ColumnReader, |
| Int96ColumnWriter, |
| Int96, |
| mem::size_of::<Int96>() |
| ); |
| make_type!( |
| FloatType, |
| Type::FLOAT, |
| FloatColumnReader, |
| FloatColumnWriter, |
| f32, |
| 4 |
| ); |
| make_type!( |
| DoubleType, |
| Type::DOUBLE, |
| DoubleColumnReader, |
| DoubleColumnWriter, |
| f64, |
| 8 |
| ); |
| make_type!( |
| ByteArrayType, |
| Type::BYTE_ARRAY, |
| ByteArrayColumnReader, |
| ByteArrayColumnWriter, |
| ByteArray, |
| mem::size_of::<ByteArray>() |
| ); |
| make_type!( |
| FixedLenByteArrayType, |
| Type::FIXED_LEN_BYTE_ARRAY, |
| FixedLenByteArrayColumnReader, |
| FixedLenByteArrayColumnWriter, |
| FixedLenByteArray, |
| mem::size_of::<FixedLenByteArray>() |
| ); |
| |
| impl FromBytes for Int96 { |
| type Buffer = [u8; 12]; |
| fn from_le_bytes(_bs: Self::Buffer) -> Self { |
| unimplemented!() |
| } |
| fn from_be_bytes(_bs: Self::Buffer) -> Self { |
| unimplemented!() |
| } |
| fn from_ne_bytes(bs: Self::Buffer) -> Self { |
| let mut i = Int96::new(); |
| i.set_data( |
| from_ne_slice(&bs[0..4]), |
| from_ne_slice(&bs[4..8]), |
| from_ne_slice(&bs[8..12]), |
| ); |
| i |
| } |
| } |
| |
| // FIXME Needed to satisfy the constraint of many decoding functions but ByteArray does not |
| // appear to actual be converted directly from bytes |
| impl FromBytes for ByteArray { |
| type Buffer = [u8; 8]; |
| fn from_le_bytes(_bs: Self::Buffer) -> Self { |
| unreachable!() |
| } |
| fn from_be_bytes(_bs: Self::Buffer) -> Self { |
| unreachable!() |
| } |
| fn from_ne_bytes(bs: Self::Buffer) -> Self { |
| ByteArray::from(bs.to_vec()) |
| } |
| } |
| |
| impl FromBytes for FixedLenByteArray { |
| type Buffer = [u8; 8]; |
| |
| fn from_le_bytes(_bs: Self::Buffer) -> Self { |
| unreachable!() |
| } |
| fn from_be_bytes(_bs: Self::Buffer) -> Self { |
| unreachable!() |
| } |
| fn from_ne_bytes(bs: Self::Buffer) -> Self { |
| Self(ByteArray::from(bs.to_vec())) |
| } |
| } |
| |
| /// Macro to reduce repetition in making type assertions on the physical type against `T` |
| macro_rules! ensure_phys_ty { |
| ($($ty: pat)|+ , $err: literal) => { |
| match T::get_physical_type() { |
| $($ty => (),)* |
| _ => panic!($err), |
| }; |
| } |
| } |
| |
| #[cfg(test)] |
| #[allow(clippy::float_cmp, clippy::approx_constant)] |
| mod tests { |
| use super::*; |
| |
| #[test] |
| #[allow(clippy::string_lit_as_bytes)] |
| fn test_as_bytes() { |
| assert_eq!(false.as_bytes(), &[0]); |
| assert_eq!(true.as_bytes(), &[1]); |
| assert_eq!(7_i32.as_bytes(), &[7, 0, 0, 0]); |
| assert_eq!(555_i32.as_bytes(), &[43, 2, 0, 0]); |
| assert_eq!(555_u32.as_bytes(), &[43, 2, 0, 0]); |
| assert_eq!(i32::max_value().as_bytes(), &[255, 255, 255, 127]); |
| assert_eq!(i32::min_value().as_bytes(), &[0, 0, 0, 128]); |
| assert_eq!(7_i64.as_bytes(), &[7, 0, 0, 0, 0, 0, 0, 0]); |
| assert_eq!(555_i64.as_bytes(), &[43, 2, 0, 0, 0, 0, 0, 0]); |
| assert_eq!( |
| (i64::max_value()).as_bytes(), |
| &[255, 255, 255, 255, 255, 255, 255, 127] |
| ); |
| assert_eq!((i64::min_value()).as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 128]); |
| assert_eq!(3.14_f32.as_bytes(), &[195, 245, 72, 64]); |
| assert_eq!(3.14_f64.as_bytes(), &[31, 133, 235, 81, 184, 30, 9, 64]); |
| assert_eq!("hello".as_bytes(), &[b'h', b'e', b'l', b'l', b'o']); |
| assert_eq!( |
| Vec::from("hello".as_bytes()).as_bytes(), |
| &[b'h', b'e', b'l', b'l', b'o'] |
| ); |
| |
| // Test Int96 |
| let i96 = Int96::from(vec![1, 2, 3]); |
| assert_eq!(i96.as_bytes(), &[1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]); |
| |
| // Test ByteArray |
| let ba = ByteArray::from(vec![1, 2, 3]); |
| assert_eq!(ba.as_bytes(), &[1, 2, 3]); |
| |
| // Test Decimal |
| let decimal = Decimal::from_i32(123, 5, 2); |
| assert_eq!(decimal.as_bytes(), &[0, 0, 0, 123]); |
| let decimal = Decimal::from_i64(123, 5, 2); |
| assert_eq!(decimal.as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 123]); |
| let decimal = Decimal::from_bytes(ByteArray::from(vec![1, 2, 3]), 5, 2); |
| assert_eq!(decimal.as_bytes(), &[1, 2, 3]); |
| } |
| |
| #[test] |
| fn test_int96_from() { |
| assert_eq!( |
| Int96::from(vec![1, 12345, 1234567890]).data(), |
| &[1, 12345, 1234567890] |
| ); |
| } |
| |
| #[test] |
| fn test_byte_array_from() { |
| assert_eq!( |
| ByteArray::from(vec![b'A', b'B', b'C']).data(), |
| &[b'A', b'B', b'C'] |
| ); |
| assert_eq!(ByteArray::from("ABC").data(), &[b'A', b'B', b'C']); |
| assert_eq!( |
| ByteArray::from(ByteBufferPtr::new(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(), |
| &[1u8, 2u8, 3u8, 4u8, 5u8] |
| ); |
| let mut buf = ByteBuffer::new(); |
| buf.set_data(vec![6u8, 7u8, 8u8, 9u8, 10u8]); |
| assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]); |
| } |
| |
| #[test] |
| fn test_decimal_partial_eq() { |
| assert_eq!(Decimal::default(), Decimal::from_i32(0, 0, 0)); |
| assert_eq!(Decimal::from_i32(222, 5, 2), Decimal::from_i32(222, 5, 2)); |
| assert_eq!( |
| Decimal::from_bytes(ByteArray::from(vec![0, 0, 0, 3]), 5, 2), |
| Decimal::from_i32(3, 5, 2) |
| ); |
| |
| assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(111, 5, 2)); |
| assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 6, 2)); |
| assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 5, 3)); |
| |
| assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2)); |
| } |
| |
| #[test] |
| fn test_byte_array_ord() { |
| let ba1 = ByteArray::from(vec![1, 2, 3]); |
| let ba11 = ByteArray::from(vec![1, 2, 3]); |
| let ba2 = ByteArray::from(vec![3, 4]); |
| let ba3 = ByteArray::from(vec![1, 2, 4]); |
| let ba4 = ByteArray::from(vec![]); |
| let ba5 = ByteArray::from(vec![2, 2, 3]); |
| |
| assert!(ba1 > ba2); |
| assert!(ba3 > ba1); |
| assert!(ba1 > ba4); |
| assert_eq!(ba1, ba11); |
| assert!(ba5 > ba1); |
| } |
| } |