Make `SUM` and `AVG` Aggregate Type Coercion Explicit (#7369)

* Make Aggregate Type Coercion Explicit

* Clippy
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index d6c4050..8338da8 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -35,7 +35,7 @@
 use datafusion_expr::Accumulator;
 use datafusion_physical_expr::{
     equivalence::project_equivalence_properties,
-    expressions::{Avg, CastExpr, Column, Sum},
+    expressions::Column,
     normalize_out_expr_with_columns_map, reverse_order_bys,
     utils::{convert_to_expr, get_indices_of_matching_exprs},
     AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
@@ -1010,40 +1010,7 @@
         | AggregateMode::SinglePartitioned => Ok(aggr_expr
             .iter()
             .map(|agg| {
-                let pre_cast_type = if let Some(Sum {
-                    data_type,
-                    pre_cast_to_sum_type,
-                    ..
-                }) = agg.as_any().downcast_ref::<Sum>()
-                {
-                    if *pre_cast_to_sum_type {
-                        Some(data_type.clone())
-                    } else {
-                        None
-                    }
-                } else if let Some(Avg {
-                    sum_data_type,
-                    pre_cast_to_sum_type,
-                    ..
-                }) = agg.as_any().downcast_ref::<Avg>()
-                {
-                    if *pre_cast_to_sum_type {
-                        Some(sum_data_type.clone())
-                    } else {
-                        None
-                    }
-                } else {
-                    None
-                };
-                let mut result = agg
-                    .expressions()
-                    .into_iter()
-                    .map(|expr| {
-                        pre_cast_type.clone().map_or(expr.clone(), |cast_type| {
-                            Arc::new(CastExpr::new(expr, cast_type, None))
-                        })
-                    })
-                    .collect::<Vec<_>>();
+                let mut result = agg.expressions().clone();
                 // In partial mode, append ordering requirements to expressions' results.
                 // Ordering requirements are used by subsequent executors to satisfy the required
                 // ordering for `AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes.
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index c28b344..1d9c4a9 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -39,7 +39,8 @@
 
 use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_common::{Result, ScalarValue};
-use datafusion_physical_expr::expressions::{col, lit};
+use datafusion_expr::type_coercion::aggregates::coerce_types;
+use datafusion_physical_expr::expressions::{cast, col, lit};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 use test_utils::add_empty_batches;
 
@@ -261,6 +262,14 @@
     let rand_fn_idx = rng.gen_range(0..window_fn_map.len());
     let fn_name = window_fn_map.keys().collect::<Vec<_>>()[rand_fn_idx];
     let (window_fn, new_args) = window_fn_map.values().collect::<Vec<_>>()[rand_fn_idx];
+    if let WindowFunction::AggregateFunction(f) = window_fn {
+        let a = args[0].clone();
+        let dt = a.data_type(schema.as_ref()).unwrap();
+        let sig = f.signature();
+        let coerced = coerce_types(f, &[dt], &sig).unwrap();
+        args[0] = cast(a, schema, coerced[0].clone()).unwrap();
+    }
+
     for new_arg in new_args {
         args.push(new_arg.clone());
     }
diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs
index 3debe21..9412c92 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -228,20 +228,16 @@
         // Note that this function *must* return the same type that the respective physical expression returns
         // or the execution panics.
 
-        let coerced_data_types = crate::type_coercion::aggregates::coerce_types(
-            self,
-            input_expr_types,
-            &self.signature(),
-        )
-        // original errors are all related to wrong function signature
-        // aggregate them for better error message
-        .map_err(|_| {
-            DataFusionError::Plan(utils::generate_signature_error_msg(
-                &format!("{self}"),
-                self.signature(),
-                input_expr_types,
-            ))
-        })?;
+        let coerced_data_types = coerce_types(self, input_expr_types, &self.signature())
+            // original errors are all related to wrong function signature
+            // aggregate them for better error message
+            .map_err(|_| {
+                DataFusionError::Plan(utils::generate_signature_error_msg(
+                    &format!("{self}"),
+                    self.signature(),
+                    input_expr_types,
+                ))
+            })?;
 
         match self {
             AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs
index 094b6e9..c47c041 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -19,6 +19,7 @@
     DataType, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
     DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE,
 };
+
 use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
 use std::ops::Deref;
 
@@ -89,6 +90,7 @@
     input_types: &[DataType],
     signature: &Signature,
 ) -> Result<Vec<DataType>> {
+    use DataType::*;
     // Validate input_types matches (at least one of) the func signature.
     check_arg_count(agg_fun, input_types, &signature.type_signature)?;
 
@@ -105,26 +107,44 @@
         AggregateFunction::Sum => {
             // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
             // smallint, int, bigint, real, double precision, decimal, or interval.
-            if !is_sum_support_arg_type(&input_types[0]) {
-                return plan_err!(
-                    "The function {:?} does not support inputs of type {:?}.",
-                    agg_fun,
-                    input_types[0]
-                );
-            }
-            Ok(input_types.to_vec())
+            let v = match &input_types[0] {
+                Decimal128(p, s) => Decimal128(*p, *s),
+                Decimal256(p, s) => Decimal256(*p, *s),
+                d if d.is_signed_integer() => Int64,
+                d if d.is_unsigned_integer() => UInt64,
+                d if d.is_floating() => Float64,
+                Dictionary(_, v) => {
+                    return coerce_types(agg_fun, &[v.as_ref().clone()], signature)
+                }
+                _ => {
+                    return plan_err!(
+                        "The function {:?} does not support inputs of type {:?}.",
+                        agg_fun,
+                        input_types[0]
+                    )
+                }
+            };
+            Ok(vec![v])
         }
         AggregateFunction::Avg => {
             // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
             // smallint, int, bigint, real, double precision, decimal, or interval
-            if !is_avg_support_arg_type(&input_types[0]) {
-                return plan_err!(
-                    "The function {:?} does not support inputs of type {:?}.",
-                    agg_fun,
-                    input_types[0]
-                );
-            }
-            Ok(input_types.to_vec())
+            let v = match &input_types[0] {
+                Decimal128(p, s) => Decimal128(*p, *s),
+                Decimal256(p, s) => Decimal256(*p, *s),
+                d if d.is_numeric() => Float64,
+                Dictionary(_, v) => {
+                    return coerce_types(agg_fun, &[v.as_ref().clone()], signature)
+                }
+                _ => {
+                    return plan_err!(
+                        "The function {:?} does not support inputs of type {:?}.",
+                        agg_fun,
+                        input_types[0]
+                    )
+                }
+            };
+            Ok(vec![v])
         }
         AggregateFunction::BitAnd
         | AggregateFunction::BitOr
@@ -160,7 +180,7 @@
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64, Float64])
         }
         AggregateFunction::Covariance | AggregateFunction::CovariancePop => {
             if !is_covariance_support_arg_type(&input_types[0]) {
@@ -170,7 +190,7 @@
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64, Float64])
         }
         AggregateFunction::Stddev | AggregateFunction::StddevPop => {
             if !is_stddev_support_arg_type(&input_types[0]) {
@@ -180,7 +200,7 @@
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64])
         }
         AggregateFunction::Correlation => {
             if !is_correlation_support_arg_type(&input_types[0]) {
@@ -190,7 +210,7 @@
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64, Float64])
         }
         AggregateFunction::RegrSlope
         | AggregateFunction::RegrIntercept
@@ -211,7 +231,7 @@
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64, Float64])
         }
         AggregateFunction::ApproxPercentileCont => {
             if !is_approx_percentile_cont_supported_arg_type(&input_types[0]) {
@@ -357,11 +377,9 @@
 /// function return type of a sum
 pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
     match arg_type {
-        arg_type if SIGNED_INTEGERS.contains(arg_type) => Ok(DataType::Int64),
-        arg_type if UNSIGNED_INTEGERS.contains(arg_type) => Ok(DataType::UInt64),
-        // In the https://www.postgresql.org/docs/current/functions-aggregate.html doc,
-        // the result type of floating-point is FLOAT64 with the double precision.
-        DataType::Float64 | DataType::Float32 => Ok(DataType::Float64),
+        DataType::Int64 => Ok(DataType::Int64),
+        DataType::UInt64 => Ok(DataType::UInt64),
+        DataType::Float64 => Ok(DataType::Float64),
         DataType::Decimal128(precision, scale) => {
             // in the spark, the result type is DECIMAL(min(38,precision+10), s)
             // ref: https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala#L66
@@ -374,9 +392,6 @@
             let new_precision = DECIMAL256_MAX_PRECISION.min(*precision + 10);
             Ok(DataType::Decimal256(new_precision, *scale))
         }
-        DataType::Dictionary(_, dict_value_type) => {
-            sum_return_type(dict_value_type.as_ref())
-        }
         other => plan_err!("SUM does not support type \"{other:?}\""),
     }
 }
@@ -601,21 +616,29 @@
                 assert_eq!(*input_type, result.unwrap());
             }
         }
-        // test sum, avg
-        let funs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
-        let input_types = vec![
-            vec![DataType::Int32],
-            vec![DataType::Float32],
-            vec![DataType::Decimal128(20, 3)],
-            vec![DataType::Decimal256(20, 3)],
-        ];
-        for fun in funs {
-            for input_type in &input_types {
-                let signature = fun.signature();
-                let result = coerce_types(&fun, input_type, &signature);
-                assert_eq!(*input_type, result.unwrap());
-            }
-        }
+        // test sum
+        let fun = AggregateFunction::Sum;
+        let signature = fun.signature();
+        let r = coerce_types(&fun, &[DataType::Int32], &signature).unwrap();
+        assert_eq!(r[0], DataType::Int64);
+        let r = coerce_types(&fun, &[DataType::Float32], &signature).unwrap();
+        assert_eq!(r[0], DataType::Float64);
+        let r = coerce_types(&fun, &[DataType::Decimal128(20, 3)], &signature).unwrap();
+        assert_eq!(r[0], DataType::Decimal128(20, 3));
+        let r = coerce_types(&fun, &[DataType::Decimal256(20, 3)], &signature).unwrap();
+        assert_eq!(r[0], DataType::Decimal256(20, 3));
+
+        // test avg
+        let fun = AggregateFunction::Avg;
+        let signature = fun.signature();
+        let r = coerce_types(&fun, &[DataType::Int32], &signature).unwrap();
+        assert_eq!(r[0], DataType::Float64);
+        let r = coerce_types(&fun, &[DataType::Float32], &signature).unwrap();
+        assert_eq!(r[0], DataType::Float64);
+        let r = coerce_types(&fun, &[DataType::Decimal128(20, 3)], &signature).unwrap();
+        assert_eq!(r[0], DataType::Decimal128(20, 3));
+        let r = coerce_types(&fun, &[DataType::Decimal256(20, 3)], &signature).unwrap();
+        assert_eq!(r[0], DataType::Decimal256(20, 3));
 
         // ApproxPercentileCont input types
         let input_types = vec![
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs
index e9d155d..2a6e08c 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -44,8 +44,8 @@
 use datafusion_expr::utils::from_plan;
 use datafusion_expr::{
     is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
-    type_coercion, AggregateFunction, BuiltinScalarFunction, Expr, LogicalPlan, Operator,
-    Projection, WindowFrame, WindowFrameBound, WindowFrameUnits,
+    type_coercion, window_function, AggregateFunction, BuiltinScalarFunction, Expr,
+    LogicalPlan, Operator, Projection, WindowFrame, WindowFrameBound, WindowFrameUnits,
 };
 use datafusion_expr::{ExprSchemable, Signature};
 
@@ -381,6 +381,19 @@
             }) => {
                 let window_frame =
                     coerce_window_frame(window_frame, &self.schema, &order_by)?;
+
+                let args = match &fun {
+                    window_function::WindowFunction::AggregateFunction(fun) => {
+                        coerce_agg_exprs_for_signature(
+                            fun,
+                            &args,
+                            &self.schema,
+                            &fun.signature(),
+                        )?
+                    }
+                    _ => args,
+                };
+
                 let expr = Expr::WindowFunction(WindowFunction::new(
                     fun,
                     args,
@@ -961,7 +974,7 @@
             None,
         ));
         let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?);
-        let expected = "Projection: AVG(Int64(12))\n  EmptyRelation";
+        let expected = "Projection: AVG(CAST(Int64(12) AS Float64))\n  EmptyRelation";
         assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, expected)?;
 
         let empty = empty_with_type(DataType::Int32);
@@ -974,7 +987,7 @@
             None,
         ));
         let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?);
-        let expected = "Projection: AVG(a)\n  EmptyRelation";
+        let expected = "Projection: AVG(CAST(a AS Float64))\n  EmptyRelation";
         assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, expected)?;
         Ok(())
     }
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs
index 142ae87..5c71ea0 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -70,7 +70,7 @@
     \n  Inner Join:  Filter: CAST(test.col_int32 AS Float64) > __scalar_sq_1.AVG(test.col_int32)\
     \n    TableScan: test projection=[col_int32]\
     \n    SubqueryAlias: __scalar_sq_1\
-    \n      Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\
+    \n      Aggregate: groupBy=[[]], aggr=[[AVG(CAST(test.col_int32 AS Float64))]]\
     \n        Projection: test.col_int32\
     \n          Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\
     \n            TableScan: test projection=[col_int32, col_utf8]";
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 932081c..59f72b6 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -43,6 +43,7 @@
 use datafusion_common::{
     downcast_value, internal_err, not_impl_err, DataFusionError, Result, ScalarValue,
 };
+use datafusion_expr::type_coercion::aggregates::avg_return_type;
 use datafusion_expr::Accumulator;
 
 use super::groups_accumulator::EmitTo;
@@ -53,9 +54,8 @@
 pub struct Avg {
     name: String,
     expr: Arc<dyn PhysicalExpr>,
-    pub sum_data_type: DataType,
-    rt_data_type: DataType,
-    pub pre_cast_to_sum_type: bool,
+    input_data_type: DataType,
+    result_data_type: DataType,
 }
 
 impl Avg {
@@ -63,34 +63,21 @@
     pub fn new(
         expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
-        sum_data_type: DataType,
-    ) -> Self {
-        Self::new_with_pre_cast(expr, name, sum_data_type.clone(), sum_data_type, false)
-    }
-
-    pub fn new_with_pre_cast(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        sum_data_type: DataType,
-        rt_data_type: DataType,
-        cast_to_sum_type: bool,
+        data_type: DataType,
     ) -> Self {
         // the internal sum data type of avg just support FLOAT64 and Decimal data type.
         assert!(matches!(
-            sum_data_type,
+            data_type,
             DataType::Float64 | DataType::Decimal128(_, _) | DataType::Decimal256(_, _)
         ));
-        // the result of avg just support FLOAT64 and Decimal data type.
-        assert!(matches!(
-            rt_data_type,
-            DataType::Float64 | DataType::Decimal128(_, _) | DataType::Decimal256(_, _)
-        ));
+
+        let result_data_type = avg_return_type(&data_type).unwrap();
+
         Self {
             name: name.into(),
             expr,
-            sum_data_type,
-            rt_data_type,
-            pre_cast_to_sum_type: cast_to_sum_type,
+            input_data_type: data_type,
+            result_data_type,
         }
     }
 }
@@ -102,14 +89,14 @@
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.rt_data_type.clone(), true))
+        Ok(Field::new(&self.name, self.result_data_type.clone(), true))
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(AvgAccumulator::try_new(
             // avg is f64 or decimal
-            &self.sum_data_type,
-            &self.rt_data_type,
+            &self.input_data_type,
+            &self.result_data_type,
         )?))
     }
 
@@ -122,7 +109,7 @@
             ),
             Field::new(
                 format_state_name(&self.name, "sum"),
-                self.sum_data_type.clone(),
+                self.input_data_type.clone(),
                 true,
             ),
         ])
@@ -142,25 +129,25 @@
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(AvgAccumulator::try_new(
-            &self.sum_data_type,
-            &self.rt_data_type,
+            &self.input_data_type,
+            &self.result_data_type,
         )?))
     }
 
     fn groups_accumulator_supported(&self) -> bool {
         use DataType::*;
 
-        matches!(&self.rt_data_type, Float64 | Decimal128(_, _))
+        matches!(&self.result_data_type, Float64 | Decimal128(_, _))
     }
 
     fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
         use DataType::*;
         // instantiate specialized accumulator based for the type
-        match (&self.sum_data_type, &self.rt_data_type) {
+        match (&self.input_data_type, &self.result_data_type) {
             (Float64, Float64) => {
                 Ok(Box::new(AvgGroupsAccumulator::<Float64Type, _>::new(
-                    &self.sum_data_type,
-                    &self.rt_data_type,
+                    &self.input_data_type,
+                    &self.result_data_type,
                     |sum: f64, count: u64| Ok(sum / count as f64),
                 )))
             }
@@ -178,16 +165,16 @@
                     move |sum: i128, count: u64| decimal_averager.avg(sum, count as i128);
 
                 Ok(Box::new(AvgGroupsAccumulator::<Decimal128Type, _>::new(
-                    &self.sum_data_type,
-                    &self.rt_data_type,
+                    &self.input_data_type,
+                    &self.result_data_type,
                     avg_fn,
                 )))
             }
 
             _ => not_impl_err!(
                 "AvgGroupsAccumulator for ({} --> {})",
-                self.sum_data_type,
-                self.rt_data_type
+                self.input_data_type,
+                self.result_data_type
             ),
         }
     }
@@ -199,8 +186,8 @@
             .downcast_ref::<Self>()
             .map(|x| {
                 self.name == x.name
-                    && self.sum_data_type == x.sum_data_type
-                    && self.rt_data_type == x.rt_data_type
+                    && self.input_data_type == x.input_data_type
+                    && self.result_data_type == x.result_data_type
                     && self.expr.eq(&x.expr)
             })
             .unwrap_or(false)
@@ -212,7 +199,6 @@
 pub struct AvgAccumulator {
     // sum is used for null
     sum: ScalarValue,
-    sum_data_type: DataType,
     return_data_type: DataType,
     count: u64,
 }
@@ -222,7 +208,6 @@
     pub fn try_new(datatype: &DataType, return_data_type: &DataType) -> Result<Self> {
         Ok(Self {
             sum: ScalarValue::try_from(datatype)?,
-            sum_data_type: datatype.clone(),
             return_data_type: return_data_type.clone(),
             count: 0,
         })
@@ -238,16 +223,14 @@
         let values = &values[0];
 
         self.count += (values.len() - values.null_count()) as u64;
-        self.sum = self
-            .sum
-            .add(&sum::sum_batch(values, &self.sum_data_type)?)?;
+        self.sum = self.sum.add(&sum::sum_batch(values)?)?;
         Ok(())
     }
 
     fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
         self.count -= (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values, &self.sum.get_datatype())?;
+        let delta = sum_batch(values)?;
         self.sum = self.sum.sub(&delta)?;
         Ok(())
     }
@@ -258,9 +241,7 @@
         self.count += compute::sum(counts).unwrap_or(0);
 
         // sums are summed
-        self.sum = self
-            .sum
-            .add(&sum::sum_batch(&states[1], &self.sum_data_type)?)?;
+        self.sum = self.sum.add(&sum::sum_batch(&states[1])?)?;
         Ok(())
     }
 
@@ -491,72 +472,69 @@
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
-    use crate::generic_test_op;
-    use arrow::record_batch::RecordBatch;
-    use arrow::{array::*, datatypes::*};
-    use datafusion_common::Result;
+    use crate::expressions::tests::assert_aggregate;
+    use arrow::array::*;
+    use datafusion_expr::AggregateFunction;
 
     #[test]
-    fn avg_decimal() -> Result<()> {
+    fn avg_decimal() {
         // test agg
         let array: ArrayRef = Arc::new(
             (1..7)
                 .map(Some)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
 
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Avg,
-            ScalarValue::Decimal128(Some(35000), 14, 4)
-        )
+            AggregateFunction::Avg,
+            ScalarValue::Decimal128(Some(35000), 14, 4),
+        );
     }
 
     #[test]
-    fn avg_decimal_with_nulls() -> Result<()> {
+    fn avg_decimal_with_nulls() {
         let array: ArrayRef = Arc::new(
             (1..6)
                 .map(|i| if i == 2 { None } else { Some(i) })
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Avg,
-            ScalarValue::Decimal128(Some(32500), 14, 4)
-        )
+            AggregateFunction::Avg,
+            ScalarValue::Decimal128(Some(32500), 14, 4),
+        );
     }
 
     #[test]
-    fn avg_decimal_all_nulls() -> Result<()> {
+    fn avg_decimal_all_nulls() {
         // test agg
         let array: ArrayRef = Arc::new(
             std::iter::repeat::<Option<i128>>(None)
                 .take(6)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Avg,
-            ScalarValue::Decimal128(None, 14, 4)
-        )
+            AggregateFunction::Avg,
+            ScalarValue::Decimal128(None, 14, 4),
+        );
     }
 
     #[test]
-    fn avg_i32() -> Result<()> {
+    fn avg_i32() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
-        generic_test_op!(a, DataType::Int32, Avg, ScalarValue::from(3_f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
     }
 
     #[test]
-    fn avg_i32_with_nulls() -> Result<()> {
+    fn avg_i32_with_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![
             Some(1),
             None,
@@ -564,33 +542,33 @@
             Some(4),
             Some(5),
         ]));
-        generic_test_op!(a, DataType::Int32, Avg, ScalarValue::from(3.25f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3.25f64));
     }
 
     #[test]
-    fn avg_i32_all_nulls() -> Result<()> {
+    fn avg_i32_all_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-        generic_test_op!(a, DataType::Int32, Avg, ScalarValue::Float64(None))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::Float64(None));
     }
 
     #[test]
-    fn avg_u32() -> Result<()> {
+    fn avg_u32() {
         let a: ArrayRef =
             Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32]));
-        generic_test_op!(a, DataType::UInt32, Avg, ScalarValue::from(3.0f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3.0f64));
     }
 
     #[test]
-    fn avg_f32() -> Result<()> {
+    fn avg_f32() {
         let a: ArrayRef =
             Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32]));
-        generic_test_op!(a, DataType::Float32, Avg, ScalarValue::from(3_f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
     }
 
     #[test]
-    fn avg_f64() -> Result<()> {
+    fn avg_f64() {
         let a: ArrayRef =
             Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64]));
-        generic_test_op!(a, DataType::Float64, Avg, ScalarValue::from(3_f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
     }
 }
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs
index 2ef9b31..a38dfdc 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -30,7 +30,6 @@
 use crate::{expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr};
 use arrow::datatypes::Schema;
 use datafusion_common::{not_impl_err, DataFusionError, Result};
-use datafusion_expr::aggregate_function::sum_type_of_avg;
 pub use datafusion_expr::AggregateFunction;
 use std::sync::Arc;
 
@@ -50,7 +49,7 @@
         .iter()
         .map(|e| e.data_type(input_schema))
         .collect::<Result<Vec<_>>>()?;
-    let rt_type = fun.return_type(&input_phy_types)?;
+    let data_type = input_phy_types[0].clone();
     let ordering_types = ordering_req
         .iter()
         .map(|e| e.expr.data_type(input_schema))
@@ -58,72 +57,63 @@
     let input_phy_exprs = input_phy_exprs.to_vec();
     Ok(match (fun, distinct) {
         (AggregateFunction::Count, false) => Arc::new(
-            expressions::Count::new_with_multiple_exprs(input_phy_exprs, name, rt_type),
+            expressions::Count::new_with_multiple_exprs(input_phy_exprs, name, data_type),
         ),
         (AggregateFunction::Count, true) => Arc::new(expressions::DistinctCount::new(
-            input_phy_types[0].clone(),
+            data_type,
             input_phy_exprs[0].clone(),
             name,
         )),
         (AggregateFunction::Grouping, _) => Arc::new(expressions::Grouping::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BitAnd, _) => Arc::new(expressions::BitAnd::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BitOr, _) => Arc::new(expressions::BitOr::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BitXor, false) => Arc::new(expressions::BitXor::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BitXor, true) => Arc::new(expressions::DistinctBitXor::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BoolOr, _) => Arc::new(expressions::BoolOr::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
-        (AggregateFunction::Sum, false) => {
-            let cast_to_sum_type = rt_type != input_phy_types[0];
-            Arc::new(expressions::Sum::new_with_pre_cast(
-                input_phy_exprs[0].clone(),
-                name,
-                rt_type,
-                cast_to_sum_type,
-            ))
-        }
+        (AggregateFunction::Sum, false) => Arc::new(expressions::Sum::new(
+            input_phy_exprs[0].clone(),
+            name,
+            input_phy_types[0].clone(),
+        )),
         (AggregateFunction::Sum, true) => Arc::new(expressions::DistinctSum::new(
             vec![input_phy_exprs[0].clone()],
             name,
-            rt_type,
+            data_type,
         )),
-        (AggregateFunction::ApproxDistinct, _) => {
-            Arc::new(expressions::ApproxDistinct::new(
-                input_phy_exprs[0].clone(),
-                name,
-                input_phy_types[0].clone(),
-            ))
-        }
+        (AggregateFunction::ApproxDistinct, _) => Arc::new(
+            expressions::ApproxDistinct::new(input_phy_exprs[0].clone(), name, data_type),
+        ),
         (AggregateFunction::ArrayAgg, false) => {
             let expr = input_phy_exprs[0].clone();
-            let data_type = input_phy_types[0].clone();
             if ordering_req.is_empty() {
                 Arc::new(expressions::ArrayAgg::new(expr, name, data_type))
             } else {
@@ -145,43 +135,37 @@
             Arc::new(expressions::DistinctArrayAgg::new(
                 input_phy_exprs[0].clone(),
                 name,
-                input_phy_types[0].clone(),
+                data_type,
             ))
         }
         (AggregateFunction::Min, _) => Arc::new(expressions::Min::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Max, _) => Arc::new(expressions::Max::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
-        (AggregateFunction::Avg, false) => {
-            let sum_type = sum_type_of_avg(&input_phy_types)?;
-            let cast_to_sum_type = sum_type != input_phy_types[0];
-            Arc::new(expressions::Avg::new_with_pre_cast(
-                input_phy_exprs[0].clone(),
-                name,
-                sum_type,
-                rt_type,
-                cast_to_sum_type,
-            ))
-        }
+        (AggregateFunction::Avg, false) => Arc::new(expressions::Avg::new(
+            input_phy_exprs[0].clone(),
+            name,
+            data_type,
+        )),
         (AggregateFunction::Avg, true) => {
             return not_impl_err!("AVG(DISTINCT) aggregations are not available");
         }
         (AggregateFunction::Variance, false) => Arc::new(expressions::Variance::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Variance, true) => {
             return not_impl_err!("VAR(DISTINCT) aggregations are not available");
         }
         (AggregateFunction::VariancePop, false) => Arc::new(
-            expressions::VariancePop::new(input_phy_exprs[0].clone(), name, rt_type),
+            expressions::VariancePop::new(input_phy_exprs[0].clone(), name, data_type),
         ),
         (AggregateFunction::VariancePop, true) => {
             return not_impl_err!("VAR_POP(DISTINCT) aggregations are not available");
@@ -190,7 +174,7 @@
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Covariance, true) => {
             return not_impl_err!("COVAR(DISTINCT) aggregations are not available");
@@ -200,7 +184,7 @@
                 input_phy_exprs[0].clone(),
                 input_phy_exprs[1].clone(),
                 name,
-                rt_type,
+                data_type,
             ))
         }
         (AggregateFunction::CovariancePop, true) => {
@@ -209,7 +193,7 @@
         (AggregateFunction::Stddev, false) => Arc::new(expressions::Stddev::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Stddev, true) => {
             return not_impl_err!("STDDEV(DISTINCT) aggregations are not available");
@@ -217,7 +201,7 @@
         (AggregateFunction::StddevPop, false) => Arc::new(expressions::StddevPop::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::StddevPop, true) => {
             return not_impl_err!("STDDEV_POP(DISTINCT) aggregations are not available");
@@ -227,7 +211,7 @@
                 input_phy_exprs[0].clone(),
                 input_phy_exprs[1].clone(),
                 name,
-                rt_type,
+                data_type,
             ))
         }
         (AggregateFunction::Correlation, true) => {
@@ -238,63 +222,63 @@
             input_phy_exprs[1].clone(),
             name,
             RegrType::Slope,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrIntercept, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::Intercept,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrCount, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::Count,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrR2, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::R2,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrAvgx, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::AvgX,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrAvgy, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::AvgY,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrSXX, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::SXX,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrSYY, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::SYY,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrSXY, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::SXY,
-            rt_type,
+            data_type,
         )),
         (
             AggregateFunction::RegrSlope
@@ -316,14 +300,14 @@
                     // Pass in the desired percentile expr
                     input_phy_exprs,
                     name,
-                    rt_type,
+                    data_type,
                 )?)
             } else {
                 Arc::new(expressions::ApproxPercentileCont::new_with_max_size(
                     // Pass in the desired percentile expr
                     input_phy_exprs,
                     name,
-                    rt_type,
+                    data_type,
                 )?)
             }
         }
@@ -337,7 +321,7 @@
                 // Pass in the desired percentile expr
                 input_phy_exprs,
                 name,
-                rt_type,
+                data_type,
             )?)
         }
         (AggregateFunction::ApproxPercentileContWithWeight, true) => {
@@ -349,7 +333,7 @@
             Arc::new(expressions::ApproxMedian::try_new(
                 input_phy_exprs[0].clone(),
                 name,
-                rt_type,
+                data_type,
             )?)
         }
         (AggregateFunction::ApproxMedian, true) => {
@@ -360,7 +344,7 @@
         (AggregateFunction::Median, false) => Arc::new(expressions::Median::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Median, true) => {
             return not_impl_err!("MEDIAN(DISTINCT) aggregations are not available");
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index 178f08b..738ca4e 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -218,17 +218,13 @@
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
+        Ok(Field::new(&self.name, DataType::Int64, self.nullable))
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
         Ok(vec![Field::new(
             format_state_name(&self.name, "count"),
-            self.data_type.clone(),
+            DataType::Int64,
             true,
         )])
     }
diff --git a/datafusion/physical-expr/src/aggregate/grouping.rs b/datafusion/physical-expr/src/aggregate/grouping.rs
index 6a7e6f9..70afda2 100644
--- a/datafusion/physical-expr/src/aggregate/grouping.rs
+++ b/datafusion/physical-expr/src/aggregate/grouping.rs
@@ -62,17 +62,13 @@
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
+        Ok(Field::new(&self.name, DataType::Int32, self.nullable))
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
         Ok(vec![Field::new(
             format_state_name(&self.name, "grouping"),
-            self.data_type.clone(),
+            DataType::Int32,
             true,
         )])
     }
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs
index cca1721..baaebad 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -30,13 +30,9 @@
 use arrow::array::Decimal128Array;
 use arrow::array::Decimal256Array;
 use arrow::compute;
-use arrow::compute::kernels::cast;
 use arrow::datatypes::DataType;
 use arrow::{
-    array::{
-        ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
-        Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
-    },
+    array::{ArrayRef, Float64Array, Int64Array, UInt64Array},
     datatypes::Field,
 };
 use arrow_array::types::{
@@ -46,16 +42,16 @@
 use datafusion_common::{
     downcast_value, internal_err, not_impl_err, DataFusionError, Result, ScalarValue,
 };
+use datafusion_expr::type_coercion::aggregates::sum_return_type;
 use datafusion_expr::Accumulator;
 
 /// SUM aggregate expression
 #[derive(Debug, Clone)]
 pub struct Sum {
     name: String,
-    pub data_type: DataType,
+    data_type: DataType,
     expr: Arc<dyn PhysicalExpr>,
     nullable: bool,
-    pub pre_cast_to_sum_type: bool,
 }
 
 impl Sum {
@@ -65,27 +61,12 @@
         name: impl Into<String>,
         data_type: DataType,
     ) -> Self {
+        let data_type = sum_return_type(&data_type).unwrap();
         Self {
             name: name.into(),
             expr,
             data_type,
             nullable: true,
-            pre_cast_to_sum_type: false,
-        }
-    }
-
-    pub fn new_with_pre_cast(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-        pre_cast_to_sum_type: bool,
-    ) -> Self {
-        Self {
-            name: name.into(),
-            expr,
-            data_type,
-            nullable: true,
-            pre_cast_to_sum_type,
         }
     }
 }
@@ -269,14 +250,7 @@
 }
 
 // sums the array and returns a ScalarValue of its corresponding type.
-pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> Result<ScalarValue> {
-    // TODO refine the cast kernel in arrow-rs
-    let cast_values = if values.data_type() != sum_type {
-        Some(cast(values, sum_type)?)
-    } else {
-        None
-    };
-    let values = cast_values.as_ref().unwrap_or(values);
+pub(crate) fn sum_batch(values: &ArrayRef) -> Result<ScalarValue> {
     Ok(match values.data_type() {
         DataType::Decimal128(precision, scale) => {
             sum_decimal_batch(values, *precision, *scale)?
@@ -285,15 +259,8 @@
             sum_decimal256_batch(values, *precision, *scale)?
         }
         DataType::Float64 => typed_sum_delta_batch!(values, Float64Array, Float64),
-        DataType::Float32 => typed_sum_delta_batch!(values, Float32Array, Float32),
         DataType::Int64 => typed_sum_delta_batch!(values, Int64Array, Int64),
-        DataType::Int32 => typed_sum_delta_batch!(values, Int32Array, Int32),
-        DataType::Int16 => typed_sum_delta_batch!(values, Int16Array, Int16),
-        DataType::Int8 => typed_sum_delta_batch!(values, Int8Array, Int8),
         DataType::UInt64 => typed_sum_delta_batch!(values, UInt64Array, UInt64),
-        DataType::UInt32 => typed_sum_delta_batch!(values, UInt32Array, UInt32),
-        DataType::UInt16 => typed_sum_delta_batch!(values, UInt16Array, UInt16),
-        DataType::UInt8 => typed_sum_delta_batch!(values, UInt8Array, UInt8),
         e => {
             return internal_err!("Sum is not expected to receive the type {e:?}");
         }
@@ -307,7 +274,7 @@
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
-        let delta = sum_batch(values, &self.sum.get_datatype())?;
+        let delta = sum_batch(values)?;
         self.sum = self.sum.add(&delta)?;
         Ok(())
     }
@@ -336,7 +303,7 @@
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
         self.count += (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values, &self.sum.get_datatype())?;
+        let delta = sum_batch(values)?;
         self.sum = self.sum.add(&delta)?;
         Ok(())
     }
@@ -344,7 +311,7 @@
     fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
         self.count -= (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values, &self.sum.get_datatype())?;
+        let delta = sum_batch(values)?;
         self.sum = self.sum.sub(&delta)?;
         Ok(())
     }
@@ -376,23 +343,21 @@
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
-    use crate::generic_test_op;
-    use arrow::datatypes::*;
-    use arrow::record_batch::RecordBatch;
-    use datafusion_common::Result;
+    use crate::expressions::tests::assert_aggregate;
+    use arrow_array::{Float32Array, Int32Array, UInt32Array};
+    use datafusion_expr::AggregateFunction;
 
     #[test]
-    fn sum_decimal() -> Result<()> {
+    fn sum_decimal() {
         // test sum batch
         let array: ArrayRef = Arc::new(
             (1..6)
                 .map(Some)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        let result = sum_batch(&array, &DataType::Decimal128(10, 0))?;
+        let result = sum_batch(&array).unwrap();
         assert_eq!(ScalarValue::Decimal128(Some(15), 10, 0), result);
 
         // test agg
@@ -400,27 +365,28 @@
             (1..6)
                 .map(Some)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
 
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Sum,
-            ScalarValue::Decimal128(Some(15), 20, 0)
-        )
+            AggregateFunction::Sum,
+            ScalarValue::Decimal128(Some(15), 20, 0),
+        );
     }
 
     #[test]
-    fn sum_decimal_with_nulls() -> Result<()> {
+    fn sum_decimal_with_nulls() {
         // test with batch
         let array: ArrayRef = Arc::new(
             (1..6)
                 .map(|i| if i == 2 { None } else { Some(i) })
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        let result = sum_batch(&array, &DataType::Decimal128(10, 0))?;
+        let result = sum_batch(&array).unwrap();
         assert_eq!(ScalarValue::Decimal128(Some(13), 10, 0), result);
 
         // test agg
@@ -428,45 +394,46 @@
             (1..6)
                 .map(|i| if i == 2 { None } else { Some(i) })
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(35, 0)?,
+                .with_precision_and_scale(35, 0)
+                .unwrap(),
         );
-        generic_test_op!(
+
+        assert_aggregate(
             array,
-            DataType::Decimal128(35, 0),
-            Sum,
-            ScalarValue::Decimal128(Some(13), 38, 0)
-        )
+            AggregateFunction::Sum,
+            ScalarValue::Decimal128(Some(13), 38, 0),
+        );
     }
 
     #[test]
-    fn sum_decimal_all_nulls() -> Result<()> {
+    fn sum_decimal_all_nulls() {
         // test with batch
         let array: ArrayRef = Arc::new(
             std::iter::repeat::<Option<i128>>(None)
                 .take(6)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        let result = sum_batch(&array, &DataType::Decimal128(10, 0))?;
+        let result = sum_batch(&array).unwrap();
         assert_eq!(ScalarValue::Decimal128(None, 10, 0), result);
 
         // test agg
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Sum,
-            ScalarValue::Decimal128(None, 20, 0)
-        )
+            AggregateFunction::Sum,
+            ScalarValue::Decimal128(None, 20, 0),
+        );
     }
 
     #[test]
-    fn sum_i32() -> Result<()> {
+    fn sum_i32() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
-        generic_test_op!(a, DataType::Int32, Sum, ScalarValue::from(15i32))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15i64));
     }
 
     #[test]
-    fn sum_i32_with_nulls() -> Result<()> {
+    fn sum_i32_with_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![
             Some(1),
             None,
@@ -474,33 +441,33 @@
             Some(4),
             Some(5),
         ]));
-        generic_test_op!(a, DataType::Int32, Sum, ScalarValue::from(13i32))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(13i64));
     }
 
     #[test]
-    fn sum_i32_all_nulls() -> Result<()> {
+    fn sum_i32_all_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-        generic_test_op!(a, DataType::Int32, Sum, ScalarValue::Int32(None))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::Int64(None));
     }
 
     #[test]
-    fn sum_u32() -> Result<()> {
+    fn sum_u32() {
         let a: ArrayRef =
             Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32]));
-        generic_test_op!(a, DataType::UInt32, Sum, ScalarValue::from(15u32))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15u64));
     }
 
     #[test]
-    fn sum_f32() -> Result<()> {
+    fn sum_f32() {
         let a: ArrayRef =
             Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32]));
-        generic_test_op!(a, DataType::Float32, Sum, ScalarValue::from(15_f32))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15_f64));
     }
 
     #[test]
-    fn sum_f64() -> Result<()> {
+    fn sum_f64() {
         let a: ArrayRef =
             Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64]));
-        generic_test_op!(a, DataType::Float64, Sum, ScalarValue::from(15_f64))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15_f64));
     }
 }
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index 022e0ae..4a6d528 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -100,10 +100,15 @@
 
 #[cfg(test)]
 pub(crate) mod tests {
+    use crate::expressions::{col, create_aggregate_expr, try_cast};
     use crate::AggregateExpr;
     use arrow::record_batch::RecordBatch;
+    use arrow_array::ArrayRef;
+    use arrow_schema::{Field, Schema};
     use datafusion_common::Result;
     use datafusion_common::ScalarValue;
+    use datafusion_expr::type_coercion::aggregates::coerce_types;
+    use datafusion_expr::AggregateFunction;
     use std::sync::Arc;
 
     /// macro to perform an aggregation and verify the result.
@@ -131,6 +136,36 @@
         }};
     }
 
+    /// Assert `function(array) == expected` performing any necessary type coercion
+    pub fn assert_aggregate(
+        array: ArrayRef,
+        function: AggregateFunction,
+        expected: ScalarValue,
+    ) {
+        let data_type = array.data_type();
+        let sig = function.signature();
+        let coerced = coerce_types(&function, &[data_type.clone()], &sig).unwrap();
+
+        let input_schema = Schema::new(vec![Field::new("a", data_type.clone(), true)]);
+        let batch =
+            RecordBatch::try_new(Arc::new(input_schema.clone()), vec![array]).unwrap();
+
+        let input = try_cast(
+            col("a", &input_schema).unwrap(),
+            &input_schema,
+            coerced[0].clone(),
+        )
+        .unwrap();
+
+        let schema = Schema::new(vec![Field::new("a", coerced[0].clone(), true)]);
+        let agg =
+            create_aggregate_expr(&function, false, &[input], &[], &schema, "aggregate")
+                .unwrap();
+
+        let result = aggregate(&batch, agg).unwrap();
+        assert_eq!(expected, result);
+    }
+
     /// macro to perform an aggregation with two inputs and verify the result.
     #[macro_export]
     macro_rules! generic_test_op2 {
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 8f02e3f..ff7d74d 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1388,8 +1388,8 @@
     use datafusion::execution::context::ExecutionProps;
     use datafusion::logical_expr::create_udf;
     use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
-    use datafusion::physical_expr::expressions::in_list;
     use datafusion::physical_expr::expressions::GetFieldAccessExpr;
+    use datafusion::physical_expr::expressions::{cast, in_list};
     use datafusion::physical_expr::ScalarFunctionExpr;
     use datafusion::physical_plan::aggregates::PhysicalGroupBy;
     use datafusion::physical_plan::expressions::{like, BinaryExpr, GetIndexedFieldExpr};
@@ -1585,14 +1585,11 @@
         let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
             vec![(col("a", &schema)?, "unused".to_string())];
 
-        let aggregates: Vec<Arc<dyn AggregateExpr>> =
-            vec![Arc::new(Avg::new_with_pre_cast(
-                col("b", &schema)?,
-                "AVG(b)".to_string(),
-                DataType::Float64,
-                DataType::Float64,
-                true,
-            ))];
+        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
+            cast(col("b", &schema)?, &schema, DataType::Float64)?,
+            "AVG(b)".to_string(),
+            DataType::Float64,
+        ))];
 
         roundtrip_test(Arc::new(AggregateExec::try_new(
             AggregateMode::Final,
diff --git a/datafusion/sqllogictest/test_files/decimal.slt b/datafusion/sqllogictest/test_files/decimal.slt
index da448f0..a2a1df5 100644
--- a/datafusion/sqllogictest/test_files/decimal.slt
+++ b/datafusion/sqllogictest/test_files/decimal.slt
@@ -618,7 +618,7 @@
 statement ok
 create table t as values (arrow_cast(123, 'Decimal256(5,2)'));
 
-query error DataFusion error: Internal error: Operator \+ is not implemented for types Decimal256\(None,15,2\) and Decimal256\(Some\(12300\),15,2\)\. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
+query error DataFusion error: Internal error: Operator \+ is not implemented for types Decimal256\(None,5,2\) and Decimal256\(Some\(12300\),5,2\)\. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
 select AVG(column1) from t;
 
 statement ok
diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt
index 14638a8..9b594b0 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2059,7 +2059,7 @@
 ----
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, SUM(annotated_data_infinite2.c) AS summation1
---Aggregate: groupBy=[[annotated_data_infinite2.b, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+--Aggregate: groupBy=[[annotated_data_infinite2.b, annotated_data_infinite2.a]], aggr=[[SUM(CAST(annotated_data_infinite2.c AS Int64))]]
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data_infinite2.c)@2 as summation1]
@@ -2090,7 +2090,7 @@
 ----
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST] AS summation1
---Aggregate: groupBy=[[annotated_data_infinite2.d, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]]]
+--Aggregate: groupBy=[[annotated_data_infinite2.d, annotated_data_infinite2.a]], aggr=[[SUM(CAST(annotated_data_infinite2.c AS Int64)) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]]]
 ----TableScan: annotated_data_infinite2 projection=[a, c, d]
 physical_plan
 ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1]
@@ -2269,7 +2269,7 @@
 ----
 logical_plan
 Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------TableScan: sales_global projection=[country, amount]
 physical_plan
@@ -2312,7 +2312,7 @@
 ----
 logical_plan
 Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------Sort: sales_global.country ASC NULLS LAST
 --------TableScan: sales_global projection=[country, amount]
@@ -2348,7 +2348,7 @@
 ----
 logical_plan
 Projection: s.country, s.zip_code, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------Sort: sales_global.country ASC NULLS LAST
 --------TableScan: sales_global projection=[zip_code, country, amount]
@@ -2384,7 +2384,7 @@
 ----
 logical_plan
 Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------Sort: sales_global.country ASC NULLS LAST
 --------TableScan: sales_global projection=[country, amount]
@@ -2419,7 +2419,7 @@
 ----
 logical_plan
 Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------Sort: sales_global.country ASC NULLS LAST
 --------TableScan: sales_global projection=[country, amount]
@@ -2546,7 +2546,7 @@
 ----
 logical_plan
 Projection: sales_global.country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
---Aggregate: groupBy=[[sales_global.country]], aggr=[[SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[SUM(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
 ----TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
@@ -2580,7 +2580,7 @@
 ----
 logical_plan
 Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
---Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
 ----Sort: sales_global.ts ASC NULLS LAST
 ------TableScan: sales_global projection=[country, ts, amount]
 physical_plan
@@ -2615,7 +2615,7 @@
 ----
 logical_plan
 Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
---Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
 ----TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1]
@@ -3103,7 +3103,7 @@
 logical_plan
 Sort: r.sn ASC NULLS LAST
 --Projection: r.sn, SUM(l.amount), r.amount
-----Aggregate: groupBy=[[r.sn, r.amount]], aggr=[[SUM(l.amount)]]
+----Aggregate: groupBy=[[r.sn, r.amount]], aggr=[[SUM(CAST(l.amount AS Float64))]]
 ------Projection: l.amount, r.sn, r.amount
 --------Inner Join:  Filter: l.sn >= r.sn
 ----------SubqueryAlias: l
@@ -3248,7 +3248,7 @@
 ----Aggregate: groupBy=[[l.sn, l.zip_code, l.country, l.ts, l.currency, l.amount, l.sum_amount]], aggr=[[]]
 ------SubqueryAlias: l
 --------Projection: l.zip_code, l.country, l.sn, l.ts, l.currency, l.amount, SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS sum_amount
-----------WindowAggr: windowExpr=[[SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(l.amount AS Float64)) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ------------SubqueryAlias: l
 --------------TableScan: sales_global_with_pk projection=[zip_code, country, sn, ts, currency, amount]
 physical_plan
diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt
index e42d2ef..74968bb 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -61,7 +61,7 @@
 --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
 ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=1)
@@ -122,7 +122,7 @@
 logical_plan
 Dml: op=[Insert Into] table=[table_without_values]
 --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
-----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=1)
@@ -171,7 +171,7 @@
 --Projection: a1 AS a1, a2 AS a2
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
 ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=8)
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 6c9c6fd..b2ee546 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -106,7 +106,7 @@
 --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
 ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
@@ -169,7 +169,7 @@
 logical_plan
 Dml: op=[Insert Into] table=[table_without_values]
 --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
-----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt
index bdc8991..d88291b 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -173,7 +173,7 @@
 ----TableScan: t1 projection=[t1_id]
 ----SubqueryAlias: __scalar_sq_1
 ------Projection: SUM(t2.t2_int), t2.t2_id
---------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]]
+--------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(CAST(t2.t2_int AS Int64))]]
 ----------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
@@ -241,7 +241,7 @@
 ----TableScan: t1 projection=[t1_id]
 ----SubqueryAlias: __scalar_sq_1
 ------Projection: SUM(t2.t2_int), t2.t2_id
---------Aggregate: groupBy=[[t2.t2_id, Utf8("a")]], aggr=[[SUM(t2.t2_int)]]
+--------Aggregate: groupBy=[[t2.t2_id, Utf8("a")]], aggr=[[SUM(CAST(t2.t2_int AS Int64))]]
 ----------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
@@ -278,7 +278,7 @@
 ----SubqueryAlias: __scalar_sq_1
 ------Projection: SUM(t2.t2_int), t2.t2_id
 --------Filter: SUM(t2.t2_int) < Int64(3)
-----------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]]
+----------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(CAST(t2.t2_int AS Int64))]]
 ------------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
@@ -483,7 +483,7 @@
 Filter: EXISTS (<subquery>)
 --Subquery:
 ----Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id)
-------Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+------Aggregate: groupBy=[[]], aggr=[[SUM(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]]
 --------Filter: outer_ref(t1.t1_name) = t2.t2_name
 ----------TableScan: t2
 --TableScan: t1 projection=[t1_id, t1_name]
@@ -497,7 +497,7 @@
 --Subquery:
 ----Projection: COUNT(*)
 ------Filter: SUM(outer_ref(t1.t1_int) + t2.t2_id) > Int64(0)
---------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*), SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+--------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*), SUM(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]]
 ----------Filter: outer_ref(t1.t1_name) = t2.t2_name
 ------------TableScan: t2
 --TableScan: t1 projection=[t1_id, t1_name]
diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt
index 36bb0d4..207bf21 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1274,7 +1274,7 @@
 Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
 --WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----Projection: aggregate_test_100.c1, aggregate_test_100.c2, SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
-------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 --------TableScan: aggregate_test_100 projection=[c1, c2, c4]
 physical_plan
 ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]
@@ -2540,21 +2540,25 @@
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
 ------Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS min1, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS min2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS min3, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS max1, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS max2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS max3, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING AS cnt1, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS cnt2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING AS sumr1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING AS sumr2, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sumr3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS minr1, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS minr2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS minr3, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS maxr1, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS maxr2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS maxr3, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS cntr1, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS cntr2, SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS sum4, COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS cnt3, annotated_data_finite.inc_col
---------WindowAggr: windowExpr=[[SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.desc_col AS Int64)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
 ----------Projection: annotated_data_finite.inc_col, annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING
-------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(UInt8(1)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING AS COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING, COUNT(UInt8(1)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
---------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(UInt8(1)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, COUNT(UInt8(1)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
-----------------TableScan: annotated_data_finite projection=[ts, inc_col, desc_col]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS Int64)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(UInt8(1)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING AS COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING, COUNT(UInt8(1)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+--------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, annotated_data_finite.ts, annotated_data_finite.inc_col, annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING
+----------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS Int64)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(UInt8(1)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, COUNT(UInt8(1)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+------------------Projection: CAST(annotated_data_finite.desc_col AS Int64) AS CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, annotated_data_finite.ts, annotated_data_finite.inc_col, annotated_data_finite.desc_col
+--------------------TableScan: annotated_data_finite projection=[ts, inc_col, desc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[inc_col@24 DESC]
 ------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@16 as min1, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@17 as min2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@18 as min3, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@19 as max1, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@20 as max2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@21 as max3, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING@22 as cnt1, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@23 as cnt2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@2 as sumr1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@3 as sumr2, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sumr3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@5 as minr1, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@6 as minr2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@7 as minr3, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@8 as maxr1, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@9 as maxr2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@10 as maxr3, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING@11 as cntr1, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@12 as cntr2, SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@24 as sum4, COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@25 as cnt3, inc_col@0 as inc_col]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
-----------ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@6 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@7 as MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@8 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@9 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@10 as MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@11 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING@12 as COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@13 as COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@14 as SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@15 as SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@17 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@18 as MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@19 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@20 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@21 as MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@22 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING@23 as COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@24 as COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]
+----------ProjectionExec: expr=[inc_col@2 as inc_col, desc_col@3 as desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@4 as SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@5 as SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@7 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@8 as MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@9 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@11 as MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@12 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING@13 as COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@14 as COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@15 as SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@16 as SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@17 as SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@18 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@19 as MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@20 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@21 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@22 as MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@23 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING@24 as COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@25 as COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]
 ------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(5)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(5)), end_bound: Following(Int32(1)) }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(5)), end_bound: Following(Int32(1)) }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING: Ok(Field { name: "COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(8)) }, COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
---------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(8)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(5)) }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(5)) }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING: Ok(Field { name: "COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(2)), end_bound: Following(Int32(6)) }, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(8)) }], mode=[Sorted]
-----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--------------ProjectionExec: expr=[CAST(inc_col@2 AS Int64) as CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, ts@1 as ts, inc_col@2 as inc_col, desc_col@3 as desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@4 as SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@5 as SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@7 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@8 as MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@9 as MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@11 as MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@12 as MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING@13 as COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@14 as COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]
+----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(8)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(5)) }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(5)) }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING: Ok(Field { name: "COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(2)), end_bound: Following(Int32(6)) }, COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(8)) }], mode=[Sorted]
+------------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, ts@0 as ts, inc_col@1 as inc_col, desc_col@2 as desc_col]
+--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -2702,8 +2706,8 @@
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS min2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS max1, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS max2, COUNT(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS count1, COUNT(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS count2, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS avg1, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS avg2, annotated_data_finite.inc_col
---------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS Int64)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, AVG(CAST(annotated_data_finite.inc_col AS Float64)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS Int64)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, AVG(CAST(annotated_data_finite.inc_col AS Float64)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
 ------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]
@@ -2801,8 +2805,8 @@
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS count2, annotated_data_infinite.ts
---------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS Int64)) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS Int64)) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
 ------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]
@@ -2847,8 +2851,8 @@
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS count2, annotated_data_infinite.ts
---------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS Int64)) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS Int64)) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
 ------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]
@@ -2942,15 +2946,16 @@
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING AS sum2, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum3, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING AS sum4, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum5, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING AS sum6, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum7, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING AS sum8, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum9, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW AS sum10, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum11, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING AS sum12
 --Limit: skip=0, fetch=5
-----WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING]]
-------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
---------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
-------------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
---------------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
-----------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+----WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING]]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
+--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d
+------------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
-ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING@9 as sum2, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@14 as sum3, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING@15 as sum4, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@4 as sum5, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING@5 as sum6, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@10 as sum7, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING@11 as sum8, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@6 as sum9, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW@7 as sum10, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@12 as sum11, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING@13 as sum12]
+ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@9 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING@10 as sum2, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@15 as sum3, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING@16 as sum4, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@5 as sum5, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING@6 as sum6, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@11 as sum7, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING@12 as sum8, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@7 as sum9, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW@8 as sum10, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@13 as sum11, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING@14 as sum12]
 --GlobalLimitExec: skip=0, fetch=5
 ----BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Preceding(UInt64(1)) }], mode=[Linear]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: CurrentRow, end_bound: Following(UInt64(1)) }], mode=[PartiallySorted([1, 0])]
@@ -2958,7 +2963,8 @@
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Following(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[PartiallySorted([0])]
 ------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: CurrentRow }], mode=[PartiallySorted([0, 1])]
 --------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
+------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 
 query IIIIIIIIIIIIIII
@@ -3010,29 +3016,31 @@
 Limit: skip=0, fetch=5
 --Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
 ----Projection: annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING AS sum2, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum3, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING AS sum4, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum5, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING AS sum6, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum7, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING AS sum8, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum9, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW AS sum10, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum11, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING AS sum12
-------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING]]
---------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
-------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
---------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
-----------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
-------------------TableScan: annotated_data_finite2 projection=[a, b, c, d]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
+--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
+----------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, annotated_data_finite2.d
+--------------------TableScan: annotated_data_finite2 projection=[a, b, c, d]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]
-----ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING@9 as sum2, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@14 as sum3, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING@15 as sum4, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@4 as sum5, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING@5 as sum6, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@10 as sum7, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING@11 as sum8, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@6 as sum9, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW@7 as sum10, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@12 as sum11, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING@13 as sum12]
+----ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@9 as sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING@10 as sum2, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@15 as sum3, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING@16 as sum4, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@5 as sum5, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING@6 as sum6, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@11 as sum7, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING@12 as sum8, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@7 as sum9, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW@8 as sum10, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@13 as sum11, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING@14 as sum12]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Preceding(UInt64(1)) }], mode=[Sorted]
---------SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
+--------SortExec: expr=[d@4 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST]
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: CurrentRow, end_bound: Following(UInt64(1)) }], mode=[Sorted]
-------------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+------------SortExec: expr=[b@2 ASC NULLS LAST,a@1 ASC NULLS LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST]
 --------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-----------------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC NULLS LAST]
+----------------SortExec: expr=[b@2 ASC NULLS LAST,a@1 ASC NULLS LAST,c@3 ASC NULLS LAST]
 ------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Following(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
---------------------SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
+--------------------SortExec: expr=[a@1 ASC NULLS LAST,d@4 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST]
 ----------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: CurrentRow }], mode=[Sorted]
-------------------------SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST]
 --------------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
+------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 
 query IIIIIIIIIIIIIII