support full u32 and u64 roundtrip through parquet (#258)

* re-export arity kernels in `arrow::compute`

Seems logical since all other kernels are re-exported as well under this
flat hierarchy.

* return file from `parquet::arrow::arrow_writer::tests::[one_column]_roundtrip`

* support full arrow u64 through parquet

- updates arrow to parquet type mapping to use reinterpret/overflow cast
  for u64<->i64 similar to what the C++ stack does
- changes statistics calculation to account for the fact that u64 should
  be compared unsigned (as per spec)

Fixes #254.

* avoid copying array when reading u64 from parquet

* support full arrow u32 through parquet

This is idential to the solution we now have for u64.
diff --git a/arrow/src/compute/mod.rs b/arrow/src/compute/mod.rs
index be1aa27..166f156 100644
--- a/arrow/src/compute/mod.rs
+++ b/arrow/src/compute/mod.rs
@@ -23,6 +23,7 @@
 
 pub use self::kernels::aggregate::*;
 pub use self::kernels::arithmetic::*;
+pub use self::kernels::arity::*;
 pub use self::kernels::boolean::*;
 pub use self::kernels::cast::*;
 pub use self::kernels::comparison::*;
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index d125cf6..f209b8b 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -268,10 +268,29 @@
             }
         }
 
+        let target_type = self.get_data_type().clone();
         let arrow_data_type = match T::get_physical_type() {
             PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE,
-            PhysicalType::INT32 => ArrowInt32Type::DATA_TYPE,
-            PhysicalType::INT64 => ArrowInt64Type::DATA_TYPE,
+            PhysicalType::INT32 => {
+                match target_type {
+                    ArrowType::UInt32 => {
+                        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
+                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
+                        ArrowUInt32Type::DATA_TYPE
+                    }
+                    _ => ArrowInt32Type::DATA_TYPE,
+                }
+            }
+            PhysicalType::INT64 => {
+                match target_type {
+                    ArrowType::UInt64 => {
+                        // follow C++ implementation and use overflow/reinterpret cast from  i64 to u64 which will map
+                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
+                        ArrowUInt64Type::DATA_TYPE
+                    }
+                    _ => ArrowInt64Type::DATA_TYPE,
+                }
+            }
             PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE,
             PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE,
             PhysicalType::INT96
@@ -343,15 +362,14 @@
         // are datatypes which we must convert explicitly.
         // These are:
         // - date64: we should cast int32 to date32, then date32 to date64.
-        let target_type = self.get_data_type();
         let array = match target_type {
             ArrowType::Date64 => {
                 // this is cheap as it internally reinterprets the data
                 let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
-                arrow::compute::cast(&a, target_type)?
+                arrow::compute::cast(&a, &target_type)?
             }
             ArrowType::Decimal(p, s) => {
-                let mut builder = DecimalBuilder::new(array.len(), *p, *s);
+                let mut builder = DecimalBuilder::new(array.len(), p, s);
                 match array.data_type() {
                     ArrowType::Int32 => {
                         let values = array.as_any().downcast_ref::<Int32Array>().unwrap();
@@ -380,7 +398,7 @@
                 }
                 Arc::new(builder.finish()) as ArrayRef
             }
-            _ => arrow::compute::cast(&array, target_type)?,
+            _ => arrow::compute::cast(&array, &target_type)?,
         };
 
         // save definition and repetition buffers
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index b4915a3..df6ce98 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -211,19 +211,45 @@
     let indices = levels.filter_array_indices();
     let written = match writer {
         ColumnWriter::Int32ColumnWriter(ref mut typed) => {
-            // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
-            let array = if let ArrowDataType::Date64 = column.data_type() {
-                let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
-                arrow::compute::cast(&array, &ArrowDataType::Int32)?
-            } else {
-                arrow::compute::cast(column, &ArrowDataType::Int32)?
+            let values = match column.data_type() {
+                ArrowDataType::Date64 => {
+                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
+                    let array = if let ArrowDataType::Date64 = column.data_type() {
+                        let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
+                        arrow::compute::cast(&array, &ArrowDataType::Int32)?
+                    } else {
+                        arrow::compute::cast(column, &ArrowDataType::Int32)?
+                    };
+                    let array = array
+                        .as_any()
+                        .downcast_ref::<arrow_array::Int32Array>()
+                        .expect("Unable to get int32 array");
+                    get_numeric_array_slice::<Int32Type, _>(&array, &indices)
+                }
+                ArrowDataType::UInt32 => {
+                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
+                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
+                    let array = column
+                        .as_any()
+                        .downcast_ref::<arrow_array::UInt32Array>()
+                        .expect("Unable to get u32 array");
+                    let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>(
+                        array,
+                        |x| x as i32,
+                    );
+                    get_numeric_array_slice::<Int32Type, _>(&array, &indices)
+                }
+                _ => {
+                    let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+                    let array = array
+                        .as_any()
+                        .downcast_ref::<arrow_array::Int32Array>()
+                        .expect("Unable to get i32 array");
+                    get_numeric_array_slice::<Int32Type, _>(&array, &indices)
+                }
             };
-            let array = array
-                .as_any()
-                .downcast_ref::<arrow_array::Int32Array>()
-                .expect("Unable to get int32 array");
             typed.write_batch(
-                get_numeric_array_slice::<Int32Type, _>(&array, &indices).as_slice(),
+                values.as_slice(),
                 Some(levels.definition.as_slice()),
                 levels.repetition.as_deref(),
             )?
@@ -248,6 +274,19 @@
                         .expect("Unable to get i64 array");
                     get_numeric_array_slice::<Int64Type, _>(&array, &indices)
                 }
+                ArrowDataType::UInt64 => {
+                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
+                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
+                    let array = column
+                        .as_any()
+                        .downcast_ref::<arrow_array::UInt64Array>()
+                        .expect("Unable to get u64 array");
+                    let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>(
+                        array,
+                        |x| x as i64,
+                    );
+                    get_numeric_array_slice::<Int64Type, _>(&array, &indices)
+                }
                 _ => {
                     let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
                     let array = array
@@ -498,8 +537,8 @@
 mod tests {
     use super::*;
 
-    use std::io::Seek;
     use std::sync::Arc;
+    use std::{fs::File, io::Seek};
 
     use arrow::datatypes::ToByteSlice;
     use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
@@ -507,7 +546,11 @@
     use arrow::{array::*, buffer::Buffer};
 
     use crate::arrow::{ArrowReader, ParquetFileArrowReader};
-    use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor};
+    use crate::file::{
+        reader::{FileReader, SerializedFileReader},
+        statistics::Statistics,
+        writer::InMemoryWriteableCursor,
+    };
     use crate::util::test_common::get_temp_file;
 
     #[test]
@@ -956,7 +999,7 @@
 
     const SMALL_SIZE: usize = 4;
 
-    fn roundtrip(filename: &str, expected_batch: RecordBatch) {
+    fn roundtrip(filename: &str, expected_batch: RecordBatch) -> File {
         let file = get_temp_file(filename, &[]);
 
         let mut writer = ArrowWriter::try_new(
@@ -968,7 +1011,7 @@
         writer.write(&expected_batch).unwrap();
         writer.close().unwrap();
 
-        let reader = SerializedFileReader::new(file).unwrap();
+        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
         let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
         let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
 
@@ -986,9 +1029,11 @@
 
             assert_eq!(expected_data, actual_data);
         }
+
+        file
     }
 
-    fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) {
+    fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) -> File {
         let schema = Schema::new(vec![Field::new(
             "col",
             values.data_type().clone(),
@@ -997,7 +1042,7 @@
         let expected_batch =
             RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
 
-        roundtrip(filename, expected_batch);
+        roundtrip(filename, expected_batch)
     }
 
     fn values_required<A, I>(iter: I, filename: &str)
@@ -1449,4 +1494,66 @@
             expected_batch,
         );
     }
+
+    #[test]
+    fn u32_min_max() {
+        // check values roundtrip through parquet
+        let values = Arc::new(UInt32Array::from_iter_values(vec![
+            u32::MIN,
+            u32::MIN + 1,
+            (i32::MAX as u32) - 1,
+            i32::MAX as u32,
+            (i32::MAX as u32) + 1,
+            u32::MAX - 1,
+            u32::MAX,
+        ]));
+        let file = one_column_roundtrip("u32_min_max_single_column", values, false);
+
+        // check statistics are valid
+        let reader = SerializedFileReader::new(file).unwrap();
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+        let row_group = metadata.row_group(0);
+        assert_eq!(row_group.num_columns(), 1);
+        let column = row_group.column(0);
+        let stats = column.statistics().unwrap();
+        assert!(stats.has_min_max_set());
+        if let Statistics::Int32(stats) = stats {
+            assert_eq!(*stats.min() as u32, u32::MIN);
+            assert_eq!(*stats.max() as u32, u32::MAX);
+        } else {
+            panic!("Statistics::Int32 missing")
+        }
+    }
+
+    #[test]
+    fn u64_min_max() {
+        // check values roundtrip through parquet
+        let values = Arc::new(UInt64Array::from_iter_values(vec![
+            u64::MIN,
+            u64::MIN + 1,
+            (i64::MAX as u64) - 1,
+            i64::MAX as u64,
+            (i64::MAX as u64) + 1,
+            u64::MAX - 1,
+            u64::MAX,
+        ]));
+        let file = one_column_roundtrip("u64_min_max_single_column", values, false);
+
+        // check statistics are valid
+        let reader = SerializedFileReader::new(file).unwrap();
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+        let row_group = metadata.row_group(0);
+        assert_eq!(row_group.num_columns(), 1);
+        let column = row_group.column(0);
+        let stats = column.statistics().unwrap();
+        assert!(stats.has_min_max_set());
+        if let Statistics::Int64(stats) = stats {
+            assert_eq!(*stats.min() as u64, u64::MIN);
+            assert_eq!(*stats.max() as u64, u64::MAX);
+        } else {
+            panic!("Statistics::Int64 missing")
+        }
+    }
 }
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index 64e4880..57ccda3 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -18,9 +18,10 @@
 //! Contains column writer API.
 use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};
 
-use crate::basic::{Compression, Encoding, PageType, Type};
+use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
 use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
 use crate::compression::{create_codec, Codec};
+use crate::data_type::private::ParquetValueType;
 use crate::data_type::AsBytes;
 use crate::data_type::*;
 use crate::encodings::{
@@ -300,10 +301,18 @@
         // Process pre-calculated statistics
         match (min, max) {
             (Some(min), Some(max)) => {
-                if self.min_column_value.as_ref().map_or(true, |v| v > min) {
+                if self
+                    .min_column_value
+                    .as_ref()
+                    .map_or(true, |v| self.compare_greater(v, min))
+                {
                     self.min_column_value = Some(min.clone());
                 }
-                if self.max_column_value.as_ref().map_or(true, |v| v < max) {
+                if self
+                    .max_column_value
+                    .as_ref()
+                    .map_or(true, |v| self.compare_greater(max, v))
+                {
                     self.max_column_value = Some(max.clone());
                 }
             }
@@ -925,31 +934,51 @@
     fn update_page_min_max(&mut self, val: &T::T) {
         // simple "isNaN" check that works for all types
         if val == val {
-            if self.min_page_value.as_ref().map_or(true, |min| min > val) {
+            if self
+                .min_page_value
+                .as_ref()
+                .map_or(true, |min| self.compare_greater(min, val))
+            {
                 self.min_page_value = Some(val.clone());
             }
-            if self.max_page_value.as_ref().map_or(true, |max| max < val) {
+            if self
+                .max_page_value
+                .as_ref()
+                .map_or(true, |max| self.compare_greater(val, max))
+            {
                 self.max_page_value = Some(val.clone());
             }
         }
     }
 
     fn update_column_min_max(&mut self) {
-        if self
-            .min_column_value
-            .as_ref()
-            .map_or(true, |min| min > self.min_page_value.as_ref().unwrap())
-        {
+        let update_min = self.min_column_value.as_ref().map_or(true, |min| {
+            let page_value = self.min_page_value.as_ref().unwrap();
+            self.compare_greater(min, page_value)
+        });
+        if update_min {
             self.min_column_value = self.min_page_value.clone();
         }
-        if self
-            .max_column_value
-            .as_ref()
-            .map_or(true, |max| max < self.max_page_value.as_ref().unwrap())
-        {
+
+        let update_max = self.max_column_value.as_ref().map_or(true, |max| {
+            let page_value = self.max_page_value.as_ref().unwrap();
+            self.compare_greater(page_value, max)
+        });
+        if update_max {
             self.max_column_value = self.max_page_value.clone();
         }
     }
+
+    /// Evaluate `a > b` according to underlying logical type.
+    fn compare_greater(&self, a: &T::T, b: &T::T) -> bool {
+        if let Some(LogicalType::INTEGER(int_type)) = self.descr.logical_type() {
+            if !int_type.is_signed {
+                // need to compare unsigned
+                return a.as_u64().unwrap() > b.as_u64().unwrap();
+            }
+        }
+        a > b
+    }
 }
 
 // ----------------------------------------------------------------------