Cast between `Decimal128` and `Decimal256` arrays (#2376)

* Cast between decimal128 and decimal256

* For review

* Fix clippy
diff --git a/arrow/src/array/array_decimal.rs b/arrow/src/array/array_decimal.rs
index 412b74c..1b35f83 100644
--- a/arrow/src/array/array_decimal.rs
+++ b/arrow/src/array/array_decimal.rs
@@ -335,7 +335,7 @@
     pub fn validate_decimal_precision(&self, precision: usize) -> Result<()> {
         if precision < self.precision {
             for v in self.iter().flatten() {
-                validate_decimal256_precision(&v.to_string(), precision)?;
+                validate_decimal256_precision(&v.to_big_int(), precision)?;
             }
         }
         Ok(())
diff --git a/arrow/src/array/builder/decimal_builder.rs b/arrow/src/array/builder/decimal_builder.rs
index bd43100..aaea2e4 100644
--- a/arrow/src/array/builder/decimal_builder.rs
+++ b/arrow/src/array/builder/decimal_builder.rs
@@ -202,8 +202,7 @@
         let value = if self.value_validation {
             let raw_bytes = value.raw_value();
             let integer = BigInt::from_signed_bytes_le(raw_bytes);
-            let value_str = integer.to_string();
-            validate_decimal256_precision(&value_str, self.precision)?;
+            validate_decimal256_precision(&integer, self.precision)?;
             value
         } else {
             value
diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs
index 43c43b0..5ed3020 100644
--- a/arrow/src/array/data.rs
+++ b/arrow/src/array/data.rs
@@ -396,18 +396,24 @@
     /// panic's if the new DataType is not compatible with the
     /// existing type.
     ///
-    /// Note: currently only changing a [DataType::Decimal128]s precision
-    /// and scale are supported
+    /// Note: currently only changing a [DataType::Decimal128]s or
+    /// [DataType::Decimal256]s precision and scale are supported
     #[inline]
     pub(crate) fn with_data_type(mut self, new_data_type: DataType) -> Self {
-        assert!(
-            matches!(self.data_type, DataType::Decimal128(_, _)),
-            "only DecimalType is supported for existing type"
-        );
-        assert!(
-            matches!(new_data_type, DataType::Decimal128(_, _)),
-            "only DecimalType is supported for new datatype"
-        );
+        if matches!(self.data_type, DataType::Decimal128(_, _)) {
+            assert!(
+                matches!(new_data_type, DataType::Decimal128(_, _)),
+                "only 128-bit DecimalType is supported for new datatype"
+            );
+        } else if matches!(self.data_type, DataType::Decimal256(_, _)) {
+            assert!(
+                matches!(new_data_type, DataType::Decimal256(_, _)),
+                "only 256-bit DecimalType is supported for new datatype"
+            );
+        } else {
+            panic!("only DecimalType is supported.")
+        }
+
         self.data_type = new_data_type;
         self
     }
@@ -1044,8 +1050,7 @@
                     let offset = pos * 32;
                     let raw_bytes = &values[offset..offset + 32];
                     let integer = BigInt::from_signed_bytes_le(raw_bytes);
-                    let value_str = integer.to_string();
-                    validate_decimal256_precision(&value_str, *p)?;
+                    validate_decimal256_precision(&integer, *p)?;
                 }
                 Ok(())
             }
diff --git a/arrow/src/compute/kernels/cast.rs b/arrow/src/compute/kernels/cast.rs
index c9082af..ddca0c2 100644
--- a/arrow/src/compute/kernels/cast.rs
+++ b/arrow/src/compute/kernels/cast.rs
@@ -36,6 +36,7 @@
 //! ```
 
 use chrono::Timelike;
+use std::ops::{Div, Mul};
 use std::str;
 use std::sync::Arc;
 
@@ -53,7 +54,7 @@
 use crate::{array::*, compute::take};
 use crate::{buffer::Buffer, util::serialization::lexical_to_string};
 use num::cast::AsPrimitive;
-use num::{NumCast, ToPrimitive};
+use num::{BigInt, NumCast, ToPrimitive};
 
 /// CastOptions provides a way to override the default cast behaviors
 #[derive(Debug)]
@@ -78,6 +79,9 @@
         // TODO UTF8/unsigned numeric to decimal
         // cast one decimal type to another decimal type
         (Decimal128(_, _), Decimal128(_, _)) => true,
+        (Decimal256(_, _), Decimal256(_, _)) => true,
+        (Decimal128(_, _), Decimal256(_, _)) => true,
+        (Decimal256(_, _), Decimal128(_, _)) => true,
         // signed numeric to decimal
         (Null | Int8 | Int16 | Int32 | Int64 | Float32 | Float64, Decimal128(_, _)) |
         // decimal to signed numeric
@@ -432,7 +436,16 @@
     }
     match (from_type, to_type) {
         (Decimal128(_, s1), Decimal128(p2, s2)) => {
-            cast_decimal_to_decimal(array, s1, p2, s2)
+            cast_decimal_to_decimal::<16, 16>(array, s1, p2, s2)
+        }
+        (Decimal256(_, s1), Decimal256(p2, s2)) => {
+            cast_decimal_to_decimal::<32, 32>(array, s1, p2, s2)
+        }
+        (Decimal128(_, s1), Decimal256(p2, s2)) => {
+            cast_decimal_to_decimal::<16, 32>(array, s1, p2, s2)
+        }
+        (Decimal256(_, s1), Decimal128(p2, s2)) => {
+            cast_decimal_to_decimal::<32, 16>(array, s1, p2, s2)
         }
         (Decimal128(_, scale), _) => {
             // cast decimal to other type
@@ -1252,34 +1265,123 @@
 }
 
 /// Cast one type of decimal array to another type of decimal array
-fn cast_decimal_to_decimal(
+fn cast_decimal_to_decimal<const BYTE_WIDTH1: usize, const BYTE_WIDTH2: usize>(
     array: &ArrayRef,
     input_scale: &usize,
     output_precision: &usize,
     output_scale: &usize,
 ) -> Result<ArrayRef> {
-    let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
-
-    let output_array = if input_scale > output_scale {
+    if input_scale > output_scale {
         // For example, input_scale is 4 and output_scale is 3;
         // Original value is 11234_i128, and will be cast to 1123_i128.
         let div = 10_i128.pow((input_scale - output_scale) as u32);
-        array
-            .iter()
-            .map(|v| v.map(|v| v.as_i128() / div))
-            .collect::<Decimal128Array>()
+        if BYTE_WIDTH1 == 16 {
+            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
+            let iter = array.iter().map(|v| v.map(|v| v.as_i128() / div));
+            if BYTE_WIDTH2 == 16 {
+                let output_array = iter
+                    .collect::<Decimal128Array>()
+                    .with_precision_and_scale(*output_precision, *output_scale)?;
+
+                Ok(Arc::new(output_array))
+            } else {
+                let output_array = iter
+                    .map(|v| v.map(BigInt::from))
+                    .collect::<Decimal256Array>()
+                    .with_precision_and_scale(*output_precision, *output_scale)?;
+
+                Ok(Arc::new(output_array))
+            }
+        } else {
+            let array = array.as_any().downcast_ref::<Decimal256Array>().unwrap();
+            let iter = array.iter().map(|v| v.map(|v| v.to_big_int().div(div)));
+            if BYTE_WIDTH2 == 16 {
+                let values = iter
+                    .map(|v| {
+                        if v.is_none() {
+                            Ok(None)
+                        } else {
+                            v.as_ref().and_then(|v| v.to_i128())
+                                .ok_or_else(|| {
+                                    ArrowError::InvalidArgumentError(
+                                    format!("{:?} cannot be casted to 128-bit integer for Decimal128", v),
+                                )
+                                })
+                                .map(Some)
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                let output_array = values
+                    .into_iter()
+                    .collect::<Decimal128Array>()
+                    .with_precision_and_scale(*output_precision, *output_scale)?;
+
+                Ok(Arc::new(output_array))
+            } else {
+                let output_array = iter
+                    .collect::<Decimal256Array>()
+                    .with_precision_and_scale(*output_precision, *output_scale)?;
+
+                Ok(Arc::new(output_array))
+            }
+        }
     } else {
         // For example, input_scale is 3 and output_scale is 4;
         // Original value is 1123_i128, and will be cast to 11230_i128.
         let mul = 10_i128.pow((output_scale - input_scale) as u32);
-        array
-            .iter()
-            .map(|v| v.map(|v| v.as_i128() * mul))
-            .collect::<Decimal128Array>()
-    }
-    .with_precision_and_scale(*output_precision, *output_scale)?;
+        if BYTE_WIDTH1 == 16 {
+            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
+            let iter = array.iter().map(|v| v.map(|v| v.as_i128() * mul));
+            if BYTE_WIDTH2 == 16 {
+                let output_array = iter
+                    .collect::<Decimal128Array>()
+                    .with_precision_and_scale(*output_precision, *output_scale)?;
 
-    Ok(Arc::new(output_array))
+                Ok(Arc::new(output_array))
+            } else {
+                let output_array = iter
+                    .map(|v| v.map(BigInt::from))
+                    .collect::<Decimal256Array>()
+                    .with_precision_and_scale(*output_precision, *output_scale)?;
+
+                Ok(Arc::new(output_array))
+            }
+        } else {
+            let array = array.as_any().downcast_ref::<Decimal256Array>().unwrap();
+            let iter = array.iter().map(|v| v.map(|v| v.to_big_int().mul(mul)));
+            if BYTE_WIDTH2 == 16 {
+                let values = iter
+                    .map(|v| {
+                        if v.is_none() {
+                            Ok(None)
+                        } else {
+                            v.as_ref().and_then(|v| v.to_i128())
+                                .ok_or_else(|| {
+                                    ArrowError::InvalidArgumentError(
+                                    format!("{:?} cannot be casted to 128-bit integer for Decimal128", v),
+                                )
+                                })
+                                .map(Some)
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                let output_array = values
+                    .into_iter()
+                    .collect::<Decimal128Array>()
+                    .with_precision_and_scale(*output_precision, *output_scale)?;
+
+                Ok(Arc::new(output_array))
+            } else {
+                let output_array = iter
+                    .collect::<Decimal256Array>()
+                    .with_precision_and_scale(*output_precision, *output_scale)?;
+
+                Ok(Arc::new(output_array))
+            }
+        }
+    }
 }
 
 /// Cast an array by changing its array_data type to the desired type
@@ -2421,7 +2523,7 @@
 mod tests {
     use super::*;
     use crate::datatypes::TimeUnit;
-    use crate::util::decimal::Decimal128;
+    use crate::util::decimal::{Decimal128, Decimal256};
     use crate::{buffer::Buffer, util::display::array_value_to_string};
 
     macro_rules! generate_cast_test_case {
@@ -2460,8 +2562,19 @@
             .with_precision_and_scale(precision, scale)
     }
 
+    fn create_decimal256_array(
+        array: Vec<Option<BigInt>>,
+        precision: usize,
+        scale: usize,
+    ) -> Result<Decimal256Array> {
+        array
+            .into_iter()
+            .collect::<Decimal256Array>()
+            .with_precision_and_scale(precision, scale)
+    }
+
     #[test]
-    fn test_cast_decimal_to_decimal() {
+    fn test_cast_decimal128_to_decimal128() {
         let input_type = DataType::Decimal128(20, 3);
         let output_type = DataType::Decimal128(20, 4);
         assert!(can_cast_types(&input_type, &output_type));
@@ -2490,6 +2603,97 @@
     }
 
     #[test]
+    fn test_cast_decimal128_to_decimal256() {
+        let input_type = DataType::Decimal128(20, 3);
+        let output_type = DataType::Decimal256(20, 4);
+        assert!(can_cast_types(&input_type, &output_type));
+        let array = vec![Some(1123456), Some(2123456), Some(3123456), None];
+        let input_decimal_array = create_decimal_array(&array, 20, 3).unwrap();
+        let array = Arc::new(input_decimal_array) as ArrayRef;
+        generate_cast_test_case!(
+            &array,
+            Decimal256Array,
+            &output_type,
+            vec![
+                Some(
+                    Decimal256::from_big_int(&BigInt::from(11234560_i128), 20, 4)
+                        .unwrap()
+                ),
+                Some(
+                    Decimal256::from_big_int(&BigInt::from(21234560_i128), 20, 4)
+                        .unwrap()
+                ),
+                Some(
+                    Decimal256::from_big_int(&BigInt::from(31234560_i128), 20, 4)
+                        .unwrap()
+                ),
+                None
+            ]
+        );
+    }
+
+    #[test]
+    fn test_cast_decimal256_to_decimal128() {
+        let input_type = DataType::Decimal256(20, 3);
+        let output_type = DataType::Decimal128(20, 4);
+        assert!(can_cast_types(&input_type, &output_type));
+        let array = vec![
+            Some(BigInt::from(1123456)),
+            Some(BigInt::from(2123456)),
+            Some(BigInt::from(3123456)),
+            None,
+        ];
+        let input_decimal_array = create_decimal256_array(array, 20, 3).unwrap();
+        let array = Arc::new(input_decimal_array) as ArrayRef;
+        generate_cast_test_case!(
+            &array,
+            Decimal128Array,
+            &output_type,
+            vec![
+                Some(Decimal128::new_from_i128(20, 4, 11234560_i128)),
+                Some(Decimal128::new_from_i128(20, 4, 21234560_i128)),
+                Some(Decimal128::new_from_i128(20, 4, 31234560_i128)),
+                None
+            ]
+        );
+    }
+
+    #[test]
+    fn test_cast_decimal256_to_decimal256() {
+        let input_type = DataType::Decimal256(20, 3);
+        let output_type = DataType::Decimal256(20, 4);
+        assert!(can_cast_types(&input_type, &output_type));
+        let array = vec![
+            Some(BigInt::from(1123456)),
+            Some(BigInt::from(2123456)),
+            Some(BigInt::from(3123456)),
+            None,
+        ];
+        let input_decimal_array = create_decimal256_array(array, 20, 3).unwrap();
+        let array = Arc::new(input_decimal_array) as ArrayRef;
+        generate_cast_test_case!(
+            &array,
+            Decimal256Array,
+            &output_type,
+            vec![
+                Some(
+                    Decimal256::from_big_int(&BigInt::from(11234560_i128), 20, 4)
+                        .unwrap()
+                ),
+                Some(
+                    Decimal256::from_big_int(&BigInt::from(21234560_i128), 20, 4)
+                        .unwrap()
+                ),
+                Some(
+                    Decimal256::from_big_int(&BigInt::from(31234560_i128), 20, 4)
+                        .unwrap()
+                ),
+                None
+            ]
+        );
+    }
+
+    #[test]
     fn test_cast_decimal_to_numeric() {
         let decimal_type = DataType::Decimal128(38, 2);
         // negative test
diff --git a/arrow/src/datatypes/datatype.rs b/arrow/src/datatypes/datatype.rs
index 034920d..f3fa69d 100644
--- a/arrow/src/datatypes/datatype.rs
+++ b/arrow/src/datatypes/datatype.rs
@@ -483,9 +483,16 @@
 /// interpreted as a Decimal256 number with precision `precision`
 #[inline]
 pub(crate) fn validate_decimal256_precision(
-    value: &str,
+    value: &BigInt,
     precision: usize,
-) -> Result<BigInt> {
+) -> Result<&BigInt> {
+    if precision > DECIMAL256_MAX_PRECISION {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Max precision of a Decimal256 is {}, but got {}",
+            DECIMAL256_MAX_PRECISION, precision,
+        )));
+    }
+
     if precision > 38 {
         let max_str = MAX_DECIMAL_FOR_LARGER_PRECISION[precision - 38 - 1];
         let min_str = MIN_DECIMAL_FOR_LARGER_PRECISION[precision - 38 - 1];
@@ -493,13 +500,12 @@
         let max = BigInt::from_str_radix(max_str, 10).unwrap();
         let min = BigInt::from_str_radix(min_str, 10).unwrap();
 
-        let value = BigInt::from_str_radix(value, 10).unwrap();
-        if value > max {
+        if value > &max {
             Err(ArrowError::InvalidArgumentError(format!(
                 "{} is too large to store in a Decimal256 of precision {}. Max is {}",
                 value, precision, max
             )))
-        } else if value < min {
+        } else if value < &min {
             Err(ArrowError::InvalidArgumentError(format!(
                 "{} is too small to store in a Decimal256 of precision {}. Min is {}",
                 value, precision, min
@@ -510,7 +516,6 @@
     } else {
         let max = MAX_DECIMAL_FOR_EACH_PRECISION[precision - 1];
         let min = MIN_DECIMAL_FOR_EACH_PRECISION[precision - 1];
-        let value = BigInt::from_str_radix(value, 10).unwrap();
 
         if value.to_i128().unwrap() > max {
             Err(ArrowError::InvalidArgumentError(format!(
diff --git a/arrow/src/util/decimal.rs b/arrow/src/util/decimal.rs
index 10bd13f..a16b592 100644
--- a/arrow/src/util/decimal.rs
+++ b/arrow/src/util/decimal.rs
@@ -246,6 +246,11 @@
         bytes[0..num_bytes.len()].clone_from_slice(num_bytes);
         Decimal256::try_new_from_bytes(precision, scale, &bytes)
     }
+
+    /// Constructs a `BigInt` from this `Decimal256` value.
+    pub(crate) fn to_big_int(&self) -> BigInt {
+        BigInt::from_signed_bytes_le(&self.value)
+    }
 }
 
 // compare two signed integer which are encoded with little endian.