support full u32 and u64 roundtrip through parquet (#258)
* re-export arity kernels in `arrow::compute`
Seems logical since all other kernels are re-exported as well under this
flat hierarchy.
* return file from `parquet::arrow::arrow_writer::tests::[one_column]_roundtrip`
* support full arrow u64 through parquet
- updates arrow to parquet type mapping to use reinterpret/overflow cast
for u64<->i64 similar to what the C++ stack does
- changes statistics calculation to account for the fact that u64 should
be compared unsigned (as per spec)
Fixes #254.
* avoid copying array when reading u64 from parquet
* support full arrow u32 through parquet
This is idential to the solution we now have for u64.
diff --git a/arrow/src/compute/mod.rs b/arrow/src/compute/mod.rs
index be1aa27..166f156 100644
--- a/arrow/src/compute/mod.rs
+++ b/arrow/src/compute/mod.rs
@@ -23,6 +23,7 @@
pub use self::kernels::aggregate::*;
pub use self::kernels::arithmetic::*;
+pub use self::kernels::arity::*;
pub use self::kernels::boolean::*;
pub use self::kernels::cast::*;
pub use self::kernels::comparison::*;
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index d125cf6..f209b8b 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -268,10 +268,29 @@
}
}
+ let target_type = self.get_data_type().clone();
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE,
- PhysicalType::INT32 => ArrowInt32Type::DATA_TYPE,
- PhysicalType::INT64 => ArrowInt64Type::DATA_TYPE,
+ PhysicalType::INT32 => {
+ match target_type {
+ ArrowType::UInt32 => {
+ // follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map
+ // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
+ ArrowUInt32Type::DATA_TYPE
+ }
+ _ => ArrowInt32Type::DATA_TYPE,
+ }
+ }
+ PhysicalType::INT64 => {
+ match target_type {
+ ArrowType::UInt64 => {
+ // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map
+ // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
+ ArrowUInt64Type::DATA_TYPE
+ }
+ _ => ArrowInt64Type::DATA_TYPE,
+ }
+ }
PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE,
PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE,
PhysicalType::INT96
@@ -343,15 +362,14 @@
// are datatypes which we must convert explicitly.
// These are:
// - date64: we should cast int32 to date32, then date32 to date64.
- let target_type = self.get_data_type();
let array = match target_type {
ArrowType::Date64 => {
// this is cheap as it internally reinterprets the data
let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
- arrow::compute::cast(&a, target_type)?
+ arrow::compute::cast(&a, &target_type)?
}
ArrowType::Decimal(p, s) => {
- let mut builder = DecimalBuilder::new(array.len(), *p, *s);
+ let mut builder = DecimalBuilder::new(array.len(), p, s);
match array.data_type() {
ArrowType::Int32 => {
let values = array.as_any().downcast_ref::<Int32Array>().unwrap();
@@ -380,7 +398,7 @@
}
Arc::new(builder.finish()) as ArrayRef
}
- _ => arrow::compute::cast(&array, target_type)?,
+ _ => arrow::compute::cast(&array, &target_type)?,
};
// save definition and repetition buffers
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index b4915a3..df6ce98 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -211,19 +211,45 @@
let indices = levels.filter_array_indices();
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
- // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
- let array = if let ArrowDataType::Date64 = column.data_type() {
- let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
- arrow::compute::cast(&array, &ArrowDataType::Int32)?
- } else {
- arrow::compute::cast(column, &ArrowDataType::Int32)?
+ let values = match column.data_type() {
+ ArrowDataType::Date64 => {
+ // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
+ let array = if let ArrowDataType::Date64 = column.data_type() {
+ let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
+ arrow::compute::cast(&array, &ArrowDataType::Int32)?
+ } else {
+ arrow::compute::cast(column, &ArrowDataType::Int32)?
+ };
+ let array = array
+ .as_any()
+ .downcast_ref::<arrow_array::Int32Array>()
+ .expect("Unable to get int32 array");
+ get_numeric_array_slice::<Int32Type, _>(&array, &indices)
+ }
+ ArrowDataType::UInt32 => {
+ // follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map
+ // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
+ let array = column
+ .as_any()
+ .downcast_ref::<arrow_array::UInt32Array>()
+ .expect("Unable to get u32 array");
+ let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>(
+ array,
+ |x| x as i32,
+ );
+ get_numeric_array_slice::<Int32Type, _>(&array, &indices)
+ }
+ _ => {
+ let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+ let array = array
+ .as_any()
+ .downcast_ref::<arrow_array::Int32Array>()
+ .expect("Unable to get i32 array");
+ get_numeric_array_slice::<Int32Type, _>(&array, &indices)
+ }
};
- let array = array
- .as_any()
- .downcast_ref::<arrow_array::Int32Array>()
- .expect("Unable to get int32 array");
typed.write_batch(
- get_numeric_array_slice::<Int32Type, _>(&array, &indices).as_slice(),
+ values.as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
)?
@@ -248,6 +274,19 @@
.expect("Unable to get i64 array");
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
}
+ ArrowDataType::UInt64 => {
+ // follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map
+ // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
+ let array = column
+ .as_any()
+ .downcast_ref::<arrow_array::UInt64Array>()
+ .expect("Unable to get u64 array");
+ let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>(
+ array,
+ |x| x as i64,
+ );
+ get_numeric_array_slice::<Int64Type, _>(&array, &indices)
+ }
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
let array = array
@@ -498,8 +537,8 @@
mod tests {
use super::*;
- use std::io::Seek;
use std::sync::Arc;
+ use std::{fs::File, io::Seek};
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
@@ -507,7 +546,11 @@
use arrow::{array::*, buffer::Buffer};
use crate::arrow::{ArrowReader, ParquetFileArrowReader};
- use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor};
+ use crate::file::{
+ reader::{FileReader, SerializedFileReader},
+ statistics::Statistics,
+ writer::InMemoryWriteableCursor,
+ };
use crate::util::test_common::get_temp_file;
#[test]
@@ -956,7 +999,7 @@
const SMALL_SIZE: usize = 4;
- fn roundtrip(filename: &str, expected_batch: RecordBatch) {
+ fn roundtrip(filename: &str, expected_batch: RecordBatch) -> File {
let file = get_temp_file(filename, &[]);
let mut writer = ArrowWriter::try_new(
@@ -968,7 +1011,7 @@
writer.write(&expected_batch).unwrap();
writer.close().unwrap();
- let reader = SerializedFileReader::new(file).unwrap();
+ let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
@@ -986,9 +1029,11 @@
assert_eq!(expected_data, actual_data);
}
+
+ file
}
- fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) {
+ fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) -> File {
let schema = Schema::new(vec![Field::new(
"col",
values.data_type().clone(),
@@ -997,7 +1042,7 @@
let expected_batch =
RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
- roundtrip(filename, expected_batch);
+ roundtrip(filename, expected_batch)
}
fn values_required<A, I>(iter: I, filename: &str)
@@ -1449,4 +1494,66 @@
expected_batch,
);
}
+
+ #[test]
+ fn u32_min_max() {
+ // check values roundtrip through parquet
+ let values = Arc::new(UInt32Array::from_iter_values(vec![
+ u32::MIN,
+ u32::MIN + 1,
+ (i32::MAX as u32) - 1,
+ i32::MAX as u32,
+ (i32::MAX as u32) + 1,
+ u32::MAX - 1,
+ u32::MAX,
+ ]));
+ let file = one_column_roundtrip("u32_min_max_single_column", values, false);
+
+ // check statistics are valid
+ let reader = SerializedFileReader::new(file).unwrap();
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ assert_eq!(row_group.num_columns(), 1);
+ let column = row_group.column(0);
+ let stats = column.statistics().unwrap();
+ assert!(stats.has_min_max_set());
+ if let Statistics::Int32(stats) = stats {
+ assert_eq!(*stats.min() as u32, u32::MIN);
+ assert_eq!(*stats.max() as u32, u32::MAX);
+ } else {
+ panic!("Statistics::Int32 missing")
+ }
+ }
+
+ #[test]
+ fn u64_min_max() {
+ // check values roundtrip through parquet
+ let values = Arc::new(UInt64Array::from_iter_values(vec![
+ u64::MIN,
+ u64::MIN + 1,
+ (i64::MAX as u64) - 1,
+ i64::MAX as u64,
+ (i64::MAX as u64) + 1,
+ u64::MAX - 1,
+ u64::MAX,
+ ]));
+ let file = one_column_roundtrip("u64_min_max_single_column", values, false);
+
+ // check statistics are valid
+ let reader = SerializedFileReader::new(file).unwrap();
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ assert_eq!(row_group.num_columns(), 1);
+ let column = row_group.column(0);
+ let stats = column.statistics().unwrap();
+ assert!(stats.has_min_max_set());
+ if let Statistics::Int64(stats) = stats {
+ assert_eq!(*stats.min() as u64, u64::MIN);
+ assert_eq!(*stats.max() as u64, u64::MAX);
+ } else {
+ panic!("Statistics::Int64 missing")
+ }
+ }
}
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index 64e4880..57ccda3 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -18,9 +18,10 @@
//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};
-use crate::basic::{Compression, Encoding, PageType, Type};
+use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::compression::{create_codec, Codec};
+use crate::data_type::private::ParquetValueType;
use crate::data_type::AsBytes;
use crate::data_type::*;
use crate::encodings::{
@@ -300,10 +301,18 @@
// Process pre-calculated statistics
match (min, max) {
(Some(min), Some(max)) => {
- if self.min_column_value.as_ref().map_or(true, |v| v > min) {
+ if self
+ .min_column_value
+ .as_ref()
+ .map_or(true, |v| self.compare_greater(v, min))
+ {
self.min_column_value = Some(min.clone());
}
- if self.max_column_value.as_ref().map_or(true, |v| v < max) {
+ if self
+ .max_column_value
+ .as_ref()
+ .map_or(true, |v| self.compare_greater(max, v))
+ {
self.max_column_value = Some(max.clone());
}
}
@@ -925,31 +934,51 @@
fn update_page_min_max(&mut self, val: &T::T) {
// simple "isNaN" check that works for all types
if val == val {
- if self.min_page_value.as_ref().map_or(true, |min| min > val) {
+ if self
+ .min_page_value
+ .as_ref()
+ .map_or(true, |min| self.compare_greater(min, val))
+ {
self.min_page_value = Some(val.clone());
}
- if self.max_page_value.as_ref().map_or(true, |max| max < val) {
+ if self
+ .max_page_value
+ .as_ref()
+ .map_or(true, |max| self.compare_greater(val, max))
+ {
self.max_page_value = Some(val.clone());
}
}
}
fn update_column_min_max(&mut self) {
- if self
- .min_column_value
- .as_ref()
- .map_or(true, |min| min > self.min_page_value.as_ref().unwrap())
- {
+ let update_min = self.min_column_value.as_ref().map_or(true, |min| {
+ let page_value = self.min_page_value.as_ref().unwrap();
+ self.compare_greater(min, page_value)
+ });
+ if update_min {
self.min_column_value = self.min_page_value.clone();
}
- if self
- .max_column_value
- .as_ref()
- .map_or(true, |max| max < self.max_page_value.as_ref().unwrap())
- {
+
+ let update_max = self.max_column_value.as_ref().map_or(true, |max| {
+ let page_value = self.max_page_value.as_ref().unwrap();
+ self.compare_greater(page_value, max)
+ });
+ if update_max {
self.max_column_value = self.max_page_value.clone();
}
}
+
+ /// Evaluate `a > b` according to underlying logical type.
+ fn compare_greater(&self, a: &T::T, b: &T::T) -> bool {
+ if let Some(LogicalType::INTEGER(int_type)) = self.descr.logical_type() {
+ if !int_type.is_signed {
+ // need to compare unsigned
+ return a.as_u64().unwrap() > b.as_u64().unwrap();
+ }
+ }
+ a > b
+ }
}
// ----------------------------------------------------------------------