blob: 1672be9c04622412ea53cf360f8de87f3c89c016 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::data_type::{ByteArray, DataType, FixedLenByteArray, Int96};
// TODO: clean up imports (best done when there are few moving parts)
use arrow::array::{
Array, ArrayRef, BinaryBuilder, DecimalBuilder, FixedSizeBinaryBuilder,
IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
IntervalYearMonthBuilder, LargeBinaryBuilder, LargeStringBuilder, PrimitiveBuilder,
PrimitiveDictionaryBuilder, StringBuilder, StringDictionaryBuilder,
};
use arrow::compute::cast;
use std::convert::{From, TryInto};
use std::sync::Arc;
use crate::errors::Result;
use arrow::datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType};
use arrow::array::{
BinaryArray, DecimalArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray,
LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray,
};
use std::marker::PhantomData;
use crate::data_type::Int32Type as ParquetInt32Type;
use arrow::datatypes::Int32Type;
/// A converter is used to consume record reader's content and convert it to arrow
/// primitive array.
pub trait Converter<S, T> {
/// This method converts record reader's buffered content into arrow array.
/// It will consume record reader's data, but will not reset record reader's
/// state.
fn convert(&self, source: S) -> Result<T>;
}
pub struct FixedSizeArrayConverter {
byte_width: i32,
}
impl FixedSizeArrayConverter {
pub fn new(byte_width: i32) -> Self {
Self { byte_width }
}
}
impl Converter<Vec<Option<FixedLenByteArray>>, FixedSizeBinaryArray>
for FixedSizeArrayConverter
{
fn convert(
&self,
source: Vec<Option<FixedLenByteArray>>,
) -> Result<FixedSizeBinaryArray> {
let mut builder = FixedSizeBinaryBuilder::new(source.len(), self.byte_width);
for v in source {
match v {
Some(array) => builder.append_value(array.data()),
None => builder.append_null(),
}?
}
Ok(builder.finish())
}
}
pub struct DecimalArrayConverter {
precision: i32,
scale: i32,
}
impl DecimalArrayConverter {
pub fn new(precision: i32, scale: i32) -> Self {
Self { precision, scale }
}
fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "DecimalArray supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
i128::from_be_bytes(result)
}
}
impl Converter<Vec<Option<FixedLenByteArray>>, DecimalArray> for DecimalArrayConverter {
fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<DecimalArray> {
let mut builder = DecimalBuilder::new(
source.len(),
self.precision as usize,
self.scale as usize,
);
for v in source {
match v {
Some(array) => {
builder.append_value(Self::from_bytes_to_i128(array.data()))
}
None => builder.append_null(),
}?
}
Ok(builder.finish())
}
}
/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval,
/// and interprets it as an i32 value representing the Arrow YearMonth value
pub struct IntervalYearMonthArrayConverter {}
impl Converter<Vec<Option<FixedLenByteArray>>, IntervalYearMonthArray>
for IntervalYearMonthArrayConverter
{
fn convert(
&self,
source: Vec<Option<FixedLenByteArray>>,
) -> Result<IntervalYearMonthArray> {
let mut builder = IntervalYearMonthBuilder::new(source.len());
for v in source {
match v {
Some(array) => builder.append_value(i32::from_le_bytes(
array.data()[0..4].try_into().unwrap(),
)),
None => builder.append_null(),
}?
}
Ok(builder.finish())
}
}
/// An Arrow Interval converter, which reads the last 8 bytes of a Parquet interval,
/// and interprets it as an i32 value representing the Arrow DayTime value
pub struct IntervalDayTimeArrayConverter {}
impl Converter<Vec<Option<FixedLenByteArray>>, IntervalDayTimeArray>
for IntervalDayTimeArrayConverter
{
fn convert(
&self,
source: Vec<Option<FixedLenByteArray>>,
) -> Result<IntervalDayTimeArray> {
let mut builder = IntervalDayTimeBuilder::new(source.len());
for v in source {
match v {
Some(array) => builder.append_value(i64::from_le_bytes(
array.data()[4..12].try_into().unwrap(),
)),
None => builder.append_null(),
}?
}
Ok(builder.finish())
}
}
pub struct Int96ArrayConverter {
pub timezone: Option<String>,
}
impl Converter<Vec<Option<Int96>>, TimestampNanosecondArray> for Int96ArrayConverter {
fn convert(&self, source: Vec<Option<Int96>>) -> Result<TimestampNanosecondArray> {
Ok(TimestampNanosecondArray::from_opt_vec(
source
.into_iter()
.map(|int96| int96.map(|val| val.to_i64() * 1_000_000))
.collect(),
self.timezone.clone(),
))
}
}
pub struct Utf8ArrayConverter {}
impl Converter<Vec<Option<ByteArray>>, StringArray> for Utf8ArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<StringArray> {
let data_size = source
.iter()
.map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0))
.sum();
let mut builder = StringBuilder::with_capacity(source.len(), data_size);
for v in source {
match v {
Some(array) => builder.append_value(array.as_utf8()?),
None => builder.append_null(),
}?
}
Ok(builder.finish())
}
}
pub struct LargeUtf8ArrayConverter {}
impl Converter<Vec<Option<ByteArray>>, LargeStringArray> for LargeUtf8ArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<LargeStringArray> {
let data_size = source
.iter()
.map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0))
.sum();
let mut builder = LargeStringBuilder::with_capacity(source.len(), data_size);
for v in source {
match v {
Some(array) => builder.append_value(array.as_utf8()?),
None => builder.append_null(),
}?
}
Ok(builder.finish())
}
}
pub struct BinaryArrayConverter {}
impl Converter<Vec<Option<ByteArray>>, BinaryArray> for BinaryArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<BinaryArray> {
let mut builder = BinaryBuilder::new(source.len());
for v in source {
match v {
Some(array) => builder.append_value(array.data()),
None => builder.append_null(),
}?
}
Ok(builder.finish())
}
}
pub struct LargeBinaryArrayConverter {}
impl Converter<Vec<Option<ByteArray>>, LargeBinaryArray> for LargeBinaryArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<LargeBinaryArray> {
let mut builder = LargeBinaryBuilder::new(source.len());
for v in source {
match v {
Some(array) => builder.append_value(array.data()),
None => builder.append_null(),
}?
}
Ok(builder.finish())
}
}
pub struct StringDictionaryArrayConverter {}
impl<K: ArrowDictionaryKeyType> Converter<Vec<Option<ByteArray>>, DictionaryArray<K>>
for StringDictionaryArrayConverter
{
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<DictionaryArray<K>> {
let data_size = source
.iter()
.map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0))
.sum();
let keys_builder = PrimitiveBuilder::<K>::new(source.len());
let values_builder = StringBuilder::with_capacity(source.len(), data_size);
let mut builder = StringDictionaryBuilder::new(keys_builder, values_builder);
for v in source {
match v {
Some(array) => {
let _ = builder.append(array.as_utf8()?)?;
}
None => builder.append_null()?,
}
}
Ok(builder.finish())
}
}
pub struct DictionaryArrayConverter<DictValueSourceType, DictValueTargetType, ParquetType>
{
_dict_value_source_marker: PhantomData<DictValueSourceType>,
_dict_value_target_marker: PhantomData<DictValueTargetType>,
_parquet_marker: PhantomData<ParquetType>,
}
impl<DictValueSourceType, DictValueTargetType, ParquetType>
DictionaryArrayConverter<DictValueSourceType, DictValueTargetType, ParquetType>
{
pub fn new() -> Self {
Self {
_dict_value_source_marker: PhantomData,
_dict_value_target_marker: PhantomData,
_parquet_marker: PhantomData,
}
}
}
impl<K, DictValueSourceType, DictValueTargetType, ParquetType>
Converter<Vec<Option<<ParquetType as DataType>::T>>, DictionaryArray<K>>
for DictionaryArrayConverter<DictValueSourceType, DictValueTargetType, ParquetType>
where
K: ArrowPrimitiveType,
DictValueSourceType: ArrowPrimitiveType,
DictValueTargetType: ArrowPrimitiveType,
ParquetType: DataType,
PrimitiveArray<DictValueSourceType>: From<Vec<Option<<ParquetType as DataType>::T>>>,
{
fn convert(
&self,
source: Vec<Option<<ParquetType as DataType>::T>>,
) -> Result<DictionaryArray<K>> {
let keys_builder = PrimitiveBuilder::<K>::new(source.len());
let values_builder = PrimitiveBuilder::<DictValueTargetType>::new(source.len());
let mut builder = PrimitiveDictionaryBuilder::new(keys_builder, values_builder);
let source_array: Arc<dyn Array> =
Arc::new(PrimitiveArray::<DictValueSourceType>::from(source));
let target_array = cast(&source_array, &DictValueTargetType::DATA_TYPE)?;
let target = target_array
.as_any()
.downcast_ref::<PrimitiveArray<DictValueTargetType>>()
.unwrap();
for i in 0..target.len() {
if target.is_null(i) {
builder.append_null()?;
} else {
let _ = builder.append(target.value(i))?;
}
}
Ok(builder.finish())
}
}
pub type Utf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;
pub type LargeUtf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, LargeStringArray, LargeUtf8ArrayConverter>;
pub type BinaryConverter =
ArrayRefConverter<Vec<Option<ByteArray>>, BinaryArray, BinaryArrayConverter>;
pub type LargeBinaryConverter = ArrayRefConverter<
Vec<Option<ByteArray>>,
LargeBinaryArray,
LargeBinaryArrayConverter,
>;
pub type StringDictionaryConverter<T> = ArrayRefConverter<
Vec<Option<ByteArray>>,
DictionaryArray<T>,
StringDictionaryArrayConverter,
>;
pub type DictionaryConverter<K, SV, TV, P> = ArrayRefConverter<
Vec<Option<<P as DataType>::T>>,
DictionaryArray<K>,
DictionaryArrayConverter<SV, TV, P>,
>;
pub type PrimitiveDictionaryConverter<K, V> = ArrayRefConverter<
Vec<Option<<ParquetInt32Type as DataType>::T>>,
DictionaryArray<K>,
DictionaryArrayConverter<Int32Type, V, ParquetInt32Type>,
>;
pub type Int96Converter =
ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray, Int96ArrayConverter>;
pub type FixedLenBinaryConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
FixedSizeBinaryArray,
FixedSizeArrayConverter,
>;
pub type IntervalYearMonthConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
IntervalYearMonthArray,
IntervalYearMonthArrayConverter,
>;
pub type IntervalDayTimeConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
IntervalDayTimeArray,
IntervalDayTimeArrayConverter,
>;
pub type DecimalConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
DecimalArray,
DecimalArrayConverter,
>;
pub struct FromConverter<S, T> {
_source: PhantomData<S>,
_dest: PhantomData<T>,
}
impl<S, T> FromConverter<S, T>
where
T: From<S>,
{
pub fn new() -> Self {
Self {
_source: PhantomData,
_dest: PhantomData,
}
}
}
impl<S, T> Converter<S, T> for FromConverter<S, T>
where
T: From<S>,
{
fn convert(&self, source: S) -> Result<T> {
Ok(T::from(source))
}
}
pub struct ArrayRefConverter<S, A, C> {
_source: PhantomData<S>,
_array: PhantomData<A>,
converter: C,
}
impl<S, A, C> ArrayRefConverter<S, A, C>
where
A: Array + 'static,
C: Converter<S, A> + 'static,
{
pub fn new(converter: C) -> Self {
Self {
_source: PhantomData,
_array: PhantomData,
converter,
}
}
}
impl<S, A, C> Converter<S, ArrayRef> for ArrayRefConverter<S, A, C>
where
A: Array + 'static,
C: Converter<S, A> + 'static,
{
fn convert(&self, source: S) -> Result<ArrayRef> {
self.converter
.convert(source)
.map(|array| Arc::new(array) as ArrayRef)
}
}