| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| use std::sync::Arc; |
| |
| use arrow::{ |
| array::{ |
| ArrayRef, BooleanArray, Date32Array, Float64Array, Int32Array, TimestampMillisecondArray, |
| }, |
| compute::{DatePart, date_part}, |
| datatypes::{DataType, TimeUnit}, |
| }; |
| use chrono::{Duration, LocalResult, NaiveDate, TimeZone, Utc, prelude::*}; |
| use chrono_tz::Tz; |
| use datafusion::{ |
| common::{DataFusionError, Result, ScalarValue}, |
| physical_plan::ColumnarValue, |
| }; |
| use datafusion_ext_commons::arrow::cast::cast; |
| |
| // ---- date parts on Date32 via Arrow's date_part |
| // ----------------------------------------------- |
| |
| pub fn spark_year(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?; |
| Ok(ColumnarValue::Array(date_part(&input, DatePart::Year)?)) |
| } |
| |
| pub fn spark_month(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?; |
| Ok(ColumnarValue::Array(date_part(&input, DatePart::Month)?)) |
| } |
| |
| pub fn spark_day(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?; |
| Ok(ColumnarValue::Array(date_part(&input, DatePart::Day)?)) |
| } |
| |
| /// `spark_dayofweek(date/timestamp/compatible-string)` |
| /// |
| /// Matches Spark's `dayofweek()` semantics: |
| /// Sunday = 1, Monday = 2, ..., Saturday = 7. |
| pub fn spark_dayofweek(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?; |
| let input = input |
| .as_any() |
| .downcast_ref::<Date32Array>() |
| .expect("internal cast to Date32 must succeed"); |
| |
| // Date32 is days since 1970-01-01. 1970-01-01 is a Thursday. |
| // If we number weekdays so that Sunday = 0, ..., Saturday = 6, |
| // then 1970-01-01 corresponds to 4. For an offset `days`, |
| // weekday_index = (days + 4) mod 7 gives 0 = Sunday, ..., 6 = Saturday. |
| // Spark wants Sunday = 1, ..., Saturday = 7, so we add 1. |
| let dayofweek = Int32Array::from_iter(input.iter().map(|opt_days| { |
| opt_days.map(|days| { |
| let weekday_index = (days as i64 + 4).rem_euclid(7); |
| weekday_index as i32 + 1 |
| }) |
| })); |
| |
| Ok(ColumnarValue::Array(Arc::new(dayofweek))) |
| } |
| |
| /// `spark_weekofyear(date/timestamp/compatible-string[, timezone])` |
| /// |
| /// Matches Spark's `weekofyear()` semantics: |
| /// ISO week numbering, with Monday as the first day of the week, |
| /// and week 1 defined as the first week with more than 3 days. |
| /// |
| /// For `Timestamp` inputs, this function interprets epoch milliseconds in the |
| /// provided timezone (if any) before deriving the calendar date and ISO week. |
| /// If no timezone is provided, `UTC` is used by default. If an invalid |
| /// timezone string is provided, the function returns an execution error. |
| /// For `Date` and compatible string inputs, the behavior is unchanged: the |
| /// value is cast to `Date32` and the ISO week is computed from the resulting |
| /// date. |
| pub fn spark_weekofyear(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| // First argument as an Arrow array (date/timestamp/string, etc.) |
| let array = args[0].clone().into_array(1)?; |
| |
| // Determine timezone (for timestamp inputs). Default to UTC to match |
| // existing behavior when no timezone is provided. |
| let default_tz = chrono_tz::UTC; |
| let tz: Tz = if args.len() > 1 { |
| match &args[1] { |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) |
| | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => { |
| s.parse::<Tz>().map_err(|_| { |
| DataFusionError::Execution(format!("spark_weekofyear invalid timezone: {s}")) |
| })? |
| } |
| _ => default_tz, |
| } |
| } else { |
| default_tz |
| }; |
| |
| match array.data_type() { |
| // Timestamp inputs: localize epoch milliseconds before computing ISO week |
| DataType::Timestamp(TimeUnit::Millisecond, _) => { |
| let ts_arr = array |
| .as_any() |
| .downcast_ref::<TimestampMillisecondArray>() |
| .expect("internal cast to TimestampMillisecondArray must succeed"); |
| |
| let weekofyear = Int32Array::from_iter(ts_arr.iter().map(|opt_ms| { |
| opt_ms.and_then(|ms| { |
| tz.timestamp_millis_opt(ms) |
| .single() |
| .map(|dt| dt.date_naive().iso_week().week() as i32) |
| }) |
| })); |
| |
| Ok(ColumnarValue::Array(Arc::new(weekofyear))) |
| } |
| // Non-timestamp inputs: preserve existing Date32-based behavior |
| _ => { |
| let input = cast(&array, &DataType::Date32)?; |
| let input = input |
| .as_any() |
| .downcast_ref::<Date32Array>() |
| .expect("internal cast to Date32 must succeed"); |
| |
| let epoch = |
| NaiveDate::from_ymd_opt(1970, 1, 1).expect("1970-01-01 must be a valid date"); |
| let weekofyear = Int32Array::from_iter(input.iter().map(|opt_days| { |
| opt_days.and_then(|days| { |
| epoch |
| .checked_add_signed(Duration::days(days as i64)) |
| .map(|date| date.iso_week().week() as i32) |
| }) |
| })); |
| |
| Ok(ColumnarValue::Array(Arc::new(weekofyear))) |
| } |
| } |
| } |
| |
| /// `spark_quarter(date/timestamp/compatible-string)` |
| /// |
| /// Simulates Spark's `quarter()` function. |
| /// Converts the input to `Date32`, extracts the month (1–12), |
| /// and computes the quarter as `((month - 1) / 3) + 1`. |
| /// Null values are propagated transparently. |
| pub fn spark_quarter(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| // Cast input to Date32 for compatibility with date_part() |
| let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?; |
| |
| // Extract month (1–12) using Arrow's date_part |
| let month_arr: ArrayRef = date_part(&input, DatePart::Month)?; |
| let month_arr = month_arr |
| .as_any() |
| .downcast_ref::<Int32Array>() |
| .expect("date_part(Month) must return Int32Array"); |
| |
| // Compute quarter: ((month - 1) / 3) + 1, preserving NULLs |
| let quarter = Int32Array::from_iter( |
| month_arr |
| .iter() |
| .map(|opt_m| opt_m.map(|m| ((m - 1) / 3 + 1))), |
| ); |
| |
| Ok(ColumnarValue::Array(Arc::new(quarter))) |
| } |
| |
| // ---- timezone handling (custom, Spark-like) |
| // --------------------------------------------------- |
| |
| /// Parse optional timezone (2nd argument) into `Option<Tz>`. |
| fn parse_tz(args: &[ColumnarValue]) -> Option<Tz> { |
| parse_tz_value(args.get(1)) |
| } |
| |
| fn parse_tz_value(arg: Option<&ColumnarValue>) -> Option<Tz> { |
| match arg { |
| Some(ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))) => s.parse::<Tz>().ok(), |
| _ => None, |
| } |
| } |
| |
| fn local_datetime(epoch_ms: i64, tz_opt: Option<Tz>) -> Option<NaiveDateTime> { |
| let dt_utc = Utc.timestamp_millis_opt(epoch_ms).single()?; |
| Some(match tz_opt { |
| Some(tz) => dt_utc.with_timezone(&tz).naive_local(), |
| None => dt_utc.naive_utc(), |
| }) |
| } |
| |
| fn start_of_local_day_ms(local_date: NaiveDate, tz_opt: Option<Tz>) -> Option<i64> { |
| let local_midnight = local_date.and_hms_opt(0, 0, 0)?; |
| |
| match tz_opt { |
| Some(tz) => match tz.from_local_datetime(&local_midnight) { |
| LocalResult::Single(dt) => Some(dt.with_timezone(&Utc).timestamp_millis()), |
| LocalResult::Ambiguous(dt1, dt2) => { |
| Some(dt1.min(dt2).with_timezone(&Utc).timestamp_millis()) |
| } |
| LocalResult::None => { |
| // Align with Java's LocalDate.atStartOfDay(zone): choose the first valid |
| // local time on that date if midnight itself falls in a gap. |
| for minute in 1..=(24 * 60) { |
| let candidate = local_midnight + chrono::Duration::minutes(minute); |
| match tz.from_local_datetime(&candidate) { |
| LocalResult::Single(dt) => { |
| return Some(dt.with_timezone(&Utc).timestamp_millis()); |
| } |
| LocalResult::Ambiguous(dt1, dt2) => { |
| return Some(dt1.min(dt2).with_timezone(&Utc).timestamp_millis()); |
| } |
| LocalResult::None => continue, |
| } |
| } |
| None |
| } |
| }, |
| None => Some(local_midnight.and_utc().timestamp_millis()), |
| } |
| } |
| |
| fn days_in_month(year: i32, month: u32) -> u32 { |
| match month { |
| 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31, |
| 4 | 6 | 9 | 11 => 30, |
| 2 => { |
| let leap_year = (year % 4 == 0 && year % 100 != 0) || year % 400 == 0; |
| if leap_year { 29 } else { 28 } |
| } |
| _ => unreachable!("month must be in 1..=12"), |
| } |
| } |
| |
| fn round_to_8_digits(value: f64) -> f64 { |
| const SCALE: f64 = 1.0e8; |
| ((value * SCALE) + 0.5).floor() / SCALE |
| } |
| |
| fn months_between_value( |
| timestamp1_ms: i64, |
| timestamp2_ms: i64, |
| round_off: bool, |
| tz_opt: Option<Tz>, |
| ) -> Option<f64> { |
| const SECONDS_PER_DAY: i64 = 86_400; |
| const SECONDS_PER_MONTH: i64 = 31 * SECONDS_PER_DAY; |
| |
| let local_dt1 = local_datetime(timestamp1_ms, tz_opt)?; |
| let local_dt2 = local_datetime(timestamp2_ms, tz_opt)?; |
| let date1 = local_dt1.date(); |
| let date2 = local_dt2.date(); |
| |
| let months1 = date1.year() * 12 + date1.month() as i32; |
| let months2 = date2.year() * 12 + date2.month() as i32; |
| let month_diff = (months1 - months2) as f64; |
| |
| let day1 = date1.day(); |
| let day2 = date2.day(); |
| let days_to_month_end1 = days_in_month(date1.year(), date1.month()) - day1; |
| let days_to_month_end2 = days_in_month(date2.year(), date2.month()) - day2; |
| |
| if day1 == day2 || (days_to_month_end1 == 0 && days_to_month_end2 == 0) { |
| return Some(month_diff); |
| } |
| |
| let start_of_day1_ms = start_of_local_day_ms(date1, tz_opt)?; |
| let start_of_day2_ms = start_of_local_day_ms(date2, tz_opt)?; |
| let seconds_in_day1 = (timestamp1_ms - start_of_day1_ms) / 1000; |
| let seconds_in_day2 = (timestamp2_ms - start_of_day2_ms) / 1000; |
| let seconds_diff = |
| (day1 as i64 - day2 as i64) * SECONDS_PER_DAY + seconds_in_day1 - seconds_in_day2; |
| |
| let result = month_diff + seconds_diff as f64 / SECONDS_PER_MONTH as f64; |
| Some(if round_off { |
| round_to_8_digits(result) |
| } else { |
| result |
| }) |
| } |
| |
| /// Return the UTC offset in **seconds** for `epoch_ms` at the given `tz` |
| /// (DST-aware). |
| fn offset_seconds_at(tz: Tz, epoch_ms: i64) -> i32 { |
| // Convert epoch_ms to UTC DateTime, then ask the tz for local offset. |
| let dt_utc = Utc.timestamp_millis_opt(epoch_ms).single(); |
| match dt_utc { |
| Some(dt) => tz |
| .offset_from_utc_datetime(&dt.naive_utc()) |
| .fix() |
| .local_minus_utc(), |
| None => 0, // Gracefully return 0 on invalid inputs to avoid panic. |
| } |
| } |
| |
| /// Extract hour/minute/second from a `TimestampMillisecondArray` with optional |
| /// timezone. `which`: "hour" | "minute" | "second" |
| fn extract_hms_with_tz( |
| ts: &TimestampMillisecondArray, |
| tz_opt: Option<Tz>, |
| which: &str, |
| ) -> Int32Array { |
| const MS_PER_SEC: i64 = 1000; |
| const MS_PER_MIN: i64 = 60 * MS_PER_SEC; |
| const MS_PER_HOUR: i64 = 60 * MS_PER_MIN; |
| const MS_PER_DAY: i64 = 24 * MS_PER_HOUR; |
| |
| Int32Array::from_iter(ts.iter().map(|opt_ms| { |
| opt_ms.map(|epoch_ms| { |
| // Localize by applying tz offset in seconds (if provided). |
| let local_ms = if let Some(tz) = tz_opt { |
| let off_sec = offset_seconds_at(tz, epoch_ms) as i64; |
| epoch_ms + off_sec * MS_PER_SEC |
| } else { |
| epoch_ms // Treat as UTC when tz is None. |
| }; |
| |
| // Milliseconds within the day with positive modulo. |
| let mut day_ms = local_ms % MS_PER_DAY; |
| if day_ms < 0 { |
| day_ms += MS_PER_DAY; |
| } |
| |
| match which { |
| "hour" => (day_ms / MS_PER_HOUR) as i32, |
| "minute" => ((day_ms % MS_PER_HOUR) / MS_PER_MIN) as i32, |
| "second" => ((day_ms % MS_PER_MIN) / MS_PER_SEC) as i32, |
| _ => unreachable!("which must be one of: hour | minute | second"), |
| } |
| }) |
| })) |
| } |
| |
| // ---- Spark-like hour/minute/second built on custom TZ logic |
| // ----------------------------------- |
| |
| /// Extract the HOUR component. We first cast any input to |
| /// `Timestamp(Millisecond, None)` (to get the physical milliseconds) and then |
| /// apply our own timezone/DST logic. |
| pub fn spark_hour(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| let arr_ts_ms_none = cast( |
| &args[0].clone().into_array(1)?, |
| &DataType::Timestamp(TimeUnit::Millisecond, None), |
| )?; |
| |
| let ts = arr_ts_ms_none |
| .as_any() |
| .downcast_ref::<TimestampMillisecondArray>() |
| .expect("internal cast to Timestamp(Millisecond, None) must succeed"); |
| |
| let tz = parse_tz(args); |
| Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz( |
| ts, tz, "hour", |
| )))) |
| } |
| |
| /// Extract the MINUTE component (same approach as `spark_hour`). |
| pub fn spark_minute(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| let arr_ts_ms_none = cast( |
| &args[0].clone().into_array(1)?, |
| &DataType::Timestamp(TimeUnit::Millisecond, None), |
| )?; |
| |
| let ts = arr_ts_ms_none |
| .as_any() |
| .downcast_ref::<TimestampMillisecondArray>() |
| .expect("internal cast to Timestamp(Millisecond, None) must succeed"); |
| |
| let tz = parse_tz(args); |
| Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz( |
| ts, tz, "minute", |
| )))) |
| } |
| |
| /// Extract the SECOND component (same approach as `spark_hour`). |
| pub fn spark_second(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| let arr_ts_ms_none = cast( |
| &args[0].clone().into_array(1)?, |
| &DataType::Timestamp(TimeUnit::Millisecond, None), |
| )?; |
| |
| let ts = arr_ts_ms_none |
| .as_any() |
| .downcast_ref::<TimestampMillisecondArray>() |
| .expect("internal cast to Timestamp(Millisecond, None) must succeed"); |
| |
| let tz = parse_tz(args); |
| Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz( |
| ts, tz, "second", |
| )))) |
| } |
| |
| /// Compute Spark-compatible `months_between(timestamp1, timestamp2, roundOff)`. |
| /// |
| /// The first two arguments are timestamps in physical UTC milliseconds and the |
| /// fourth argument is an optional session timezone string used for local date |
| /// boundary calculations. |
| pub fn spark_months_between(args: &[ColumnarValue]) -> Result<ColumnarValue> { |
| if args.len() != 4 { |
| return Err(DataFusionError::Execution( |
| "spark_months_between() requires four arguments".to_string(), |
| )); |
| } |
| |
| let num_rows = args |
| .iter() |
| .find_map(|arg| match arg { |
| ColumnarValue::Array(arr) => Some(arr.len()), |
| ColumnarValue::Scalar(_) => None, |
| }) |
| .unwrap_or(1); |
| |
| let timestamp1 = cast( |
| &args[0].clone().into_array(num_rows)?, |
| &DataType::Timestamp(TimeUnit::Millisecond, None), |
| )?; |
| let timestamp2 = cast( |
| &args[1].clone().into_array(num_rows)?, |
| &DataType::Timestamp(TimeUnit::Millisecond, None), |
| )?; |
| let round_off = cast(&args[2].clone().into_array(num_rows)?, &DataType::Boolean)?; |
| |
| let timestamp1 = timestamp1 |
| .as_any() |
| .downcast_ref::<TimestampMillisecondArray>() |
| .expect("internal cast to Timestamp(Millisecond, None) must succeed"); |
| let timestamp2 = timestamp2 |
| .as_any() |
| .downcast_ref::<TimestampMillisecondArray>() |
| .expect("internal cast to Timestamp(Millisecond, None) must succeed"); |
| let round_off = round_off |
| .as_any() |
| .downcast_ref::<BooleanArray>() |
| .expect("internal cast to Boolean must succeed"); |
| let tz = parse_tz_value(args.get(3)); |
| |
| let result = Float64Array::from_iter( |
| timestamp1 |
| .iter() |
| .zip(timestamp2.iter()) |
| .zip(round_off.iter()) |
| .map(|((timestamp1_ms, timestamp2_ms), round_off)| { |
| match (timestamp1_ms, timestamp2_ms, round_off) { |
| (Some(timestamp1_ms), Some(timestamp2_ms), Some(round_off)) => { |
| months_between_value(timestamp1_ms, timestamp2_ms, round_off, tz) |
| } |
| _ => None, |
| } |
| }), |
| ); |
| |
| Ok(ColumnarValue::Array(Arc::new(result))) |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use std::sync::Arc; |
| |
| use arrow::array::{ |
| ArrayRef, Date32Array, Float64Array, Int32Array, TimestampMillisecondArray, |
| }; |
| |
| use super::*; |
| |
| #[test] |
| fn test_spark_year() -> Result<()> { |
| let input = Arc::new(Date32Array::from(vec![ |
| Some(0), |
| Some(1000), |
| Some(2000), |
| None, |
| ])); |
| let args = vec![ColumnarValue::Array(input)]; |
| let expected_ret: ArrayRef = Arc::new(Int32Array::from(vec![ |
| Some(1970), |
| Some(1972), |
| Some(1975), |
| None, |
| ])); |
| assert_eq!(&spark_year(&args)?.into_array(1)?, &expected_ret); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_month() -> Result<()> { |
| let input = Arc::new(Date32Array::from(vec![Some(0), Some(35), Some(65), None])); |
| let args = vec![ColumnarValue::Array(input)]; |
| let expected_ret: ArrayRef = |
| Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), None])); |
| assert_eq!(&spark_month(&args)?.into_array(1)?, &expected_ret); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_day() -> Result<()> { |
| let input = Arc::new(Date32Array::from(vec![ |
| Some(0), |
| Some(10), |
| Some(20), |
| Some(30), |
| Some(40), |
| None, |
| ])); |
| let args = vec![ColumnarValue::Array(input)]; |
| let expected_ret: ArrayRef = Arc::new(Int32Array::from(vec![ |
| Some(1), |
| Some(11), |
| Some(21), |
| Some(31), |
| Some(10), |
| None, |
| ])); |
| assert_eq!(&spark_day(&args)?.into_array(1)?, &expected_ret); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_dayofweek() -> Result<()> { |
| let input = Arc::new(Date32Array::from(vec![ |
| Some(-1), |
| Some(0), |
| Some(2), |
| Some(3), |
| Some(4), |
| None, |
| ])); |
| let args = vec![ColumnarValue::Array(input)]; |
| let expected_ret: ArrayRef = Arc::new(Int32Array::from(vec![ |
| Some(4), |
| Some(5), |
| Some(7), |
| Some(1), |
| Some(2), |
| None, |
| ])); |
| assert_eq!(&spark_dayofweek(&args)?.into_array(1)?, &expected_ret); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_weekofyear() -> Result<()> { |
| let input = Arc::new(Date32Array::from(vec![ |
| Some(0), |
| Some(4017), |
| Some(16801), |
| Some(17167), |
| Some(14455), |
| None, |
| ])); |
| let args = vec![ColumnarValue::Array(input)]; |
| let expected_ret: ArrayRef = Arc::new(Int32Array::from(vec![ |
| Some(1), |
| Some(1), |
| Some(53), |
| Some(52), |
| Some(31), |
| None, |
| ])); |
| assert_eq!(&spark_weekofyear(&args)?.into_array(1)?, &expected_ret); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_weekofyear_with_timezone() -> Result<()> { |
| // In America/New_York: |
| // 2021-01-04 04:30:00 UTC -> 2021-01-03 23:30:00 local -> ISO week 53 |
| // 2021-01-04 05:30:00 UTC -> 2021-01-04 00:30:00 local -> ISO week 1 |
| let input = Arc::new(TimestampMillisecondArray::from(vec![ |
| Some(utc_ms(2021, 1, 4, 4, 30, 0)), |
| Some(utc_ms(2021, 1, 4, 5, 30, 0)), |
| None, |
| ])); |
| let args = vec![ |
| ColumnarValue::Array(input), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("America/New_York".to_string()))), |
| ]; |
| let expected_ret: ArrayRef = Arc::new(Int32Array::from(vec![Some(53), Some(1), None])); |
| assert_eq!(&spark_weekofyear(&args)?.into_array(3)?, &expected_ret); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_weekofyear_invalid_timezone() { |
| let input = Arc::new(TimestampMillisecondArray::from(vec![Some(utc_ms( |
| 2021, 1, 4, 5, 30, 0, |
| ))])); |
| let args = vec![ |
| ColumnarValue::Array(input), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("Mars/Olympus".to_string()))), |
| ]; |
| |
| let err = |
| spark_weekofyear(&args).expect_err("spark_weekofyear should fail for invalid timezone"); |
| assert!(err.to_string().contains("invalid timezone")); |
| } |
| |
| #[test] |
| fn test_spark_quarter_basic() -> Result<()> { |
| // Date32 days relative to 1970-01-01: |
| // 0 -> 1970-01-01 (Q1) |
| // 40 -> ~1970-02-10 (Q1) |
| // 100 -> ~1970-04-11 (Q2) |
| // 200 -> ~1970-07-20 (Q3) |
| // 300 -> ~1970-10-28 (Q4) |
| let input = Arc::new(Date32Array::from(vec![ |
| Some(0), |
| Some(40), |
| Some(100), |
| Some(200), |
| Some(300), |
| None, |
| ])); |
| let args = vec![ColumnarValue::Array(input)]; |
| let expected: ArrayRef = Arc::new(Int32Array::from(vec![ |
| Some(1), |
| Some(1), |
| Some(2), |
| Some(3), |
| Some(4), |
| None, |
| ])); |
| |
| let out = spark_quarter(&args)?.into_array(1)?; |
| assert_eq!(&out, &expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_quarter_null_only() -> Result<()> { |
| // Ensure NULL propagation |
| let input = Arc::new(Date32Array::from(vec![None, None])); |
| let args = vec![ColumnarValue::Array(input)]; |
| let expected: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); |
| |
| let out = spark_quarter(&args)?.into_array(1)?; |
| assert_eq!(&out, &expected); |
| Ok(()) |
| } |
| |
| #[inline] |
| fn arc_tz(s: &str) -> Option<Arc<str>> { |
| Some(Arc::<str>::from(s)) |
| } |
| |
| #[inline] |
| fn ms(h: i64, m: i64, s: i64) -> i64 { |
| (h * 3600 + m * 60 + s) * 1000 |
| } |
| |
| /// Build ms since epoch helper |
| fn hms_to_ms(h: i64, m: i64, s: i64) -> i64 { |
| (h * 3600 + m * 60 + s) * 1000 |
| } |
| |
| #[test] |
| fn test_spark_hour_minute_second_basic_from_ts() -> Result<()> { |
| // 0ms -> 1970-01-01 00:00:00 UTC |
| // 5025000ms -> 1970-01-01 01:23:45 UTC |
| let ts = Arc::new(TimestampMillisecondArray::from(vec![ |
| Some(0), |
| Some(hms_to_ms(1, 23, 45)), |
| None, |
| ])); |
| |
| let args = vec![ColumnarValue::Array(ts.clone())]; |
| |
| // hour() |
| let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), Some(1), None])); |
| let out_h = spark_hour(&args)?.into_array(1)?; |
| assert_eq!(&out_h, &expected_h); |
| |
| // minute() |
| let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), Some(23), None])); |
| let out_m = spark_minute(&args)?.into_array(1)?; |
| assert_eq!(&out_m, &expected_m); |
| |
| // second() |
| let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), Some(45), None])); |
| let out_s = spark_second(&args)?.into_array(1)?; |
| assert_eq!(&out_s, &expected_s); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_timeparts_from_date32_midnight() -> Result<()> { |
| let d = Arc::new(Date32Array::from(vec![Some(0), Some(1), None])); |
| let args = vec![ColumnarValue::Array(d)]; |
| |
| // hour(date) -> 0 |
| let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), Some(0), None])); |
| let out_h = spark_hour(&args)?.into_array(1)?; |
| assert_eq!(&out_h, &expected_h); |
| |
| // minute(date) -> 0 |
| let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), Some(0), None])); |
| let out_m = spark_minute(&args)?.into_array(1)?; |
| assert_eq!(&out_m, &expected_m); |
| |
| // second(date) -> 0 |
| let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), Some(0), None])); |
| let out_s = spark_second(&args)?.into_array(1)?; |
| assert_eq!(&out_s, &expected_s); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_timeparts_scalar_vs_array_consistency() -> Result<()> { |
| // 1970-01-01 12:34:56 UTC |
| let ms = hms_to_ms(12, 34, 56); |
| |
| // Scalar |
| let out_h_scalar = spark_hour(&[ColumnarValue::Scalar( |
| datafusion::common::ScalarValue::TimestampMillisecond(Some(ms), None), |
| )])? |
| .into_array(1)?; |
| let out_m_scalar = spark_minute(&[ColumnarValue::Scalar( |
| datafusion::common::ScalarValue::TimestampMillisecond(Some(ms), None), |
| )])? |
| .into_array(1)?; |
| let out_s_scalar = spark_second(&[ColumnarValue::Scalar( |
| datafusion::common::ScalarValue::TimestampMillisecond(Some(ms), None), |
| )])? |
| .into_array(1)?; |
| |
| // Array |
| let arr = Arc::new(TimestampMillisecondArray::from(vec![Some(ms)])); |
| let out_h_array = spark_hour(&[ColumnarValue::Array(arr.clone())])?.into_array(1)?; |
| let out_m_array = spark_minute(&[ColumnarValue::Array(arr.clone())])?.into_array(1)?; |
| let out_s_array = spark_second(&[ColumnarValue::Array(arr)])?.into_array(1)?; |
| |
| assert_eq!(&out_h_scalar, &out_h_array); |
| assert_eq!(&out_m_scalar, &out_m_array); |
| assert_eq!(&out_s_scalar, &out_s_array); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_timeparts_pre_epoch_negative_ms() -> Result<()> { |
| let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(-1000)])); |
| let args = vec![ColumnarValue::Array(ts)]; |
| |
| let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(23)])); |
| let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(59)])); |
| let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(59)])); |
| |
| let out_h = spark_hour(&args)?.into_array(1)?; |
| let out_m = spark_minute(&args)?.into_array(1)?; |
| let out_s = spark_second(&args)?.into_array(1)?; |
| |
| assert_eq!(&out_h, &expected_h); |
| assert_eq!(&out_m, &expected_m); |
| assert_eq!(&out_s, &expected_s); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_timeparts_null_only() -> Result<()> { |
| let ts = Arc::new(TimestampMillisecondArray::from(vec![None, None])); |
| let args = vec![ColumnarValue::Array(ts)]; |
| |
| let expected: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); |
| |
| let out_h = spark_hour(&args)?.into_array(1)?; |
| let out_m = spark_minute(&args)?.into_array(1)?; |
| let out_s = spark_second(&args)?.into_array(1)?; |
| |
| assert_eq!(&out_h, &expected); |
| assert_eq!(&out_m, &expected); |
| assert_eq!(&out_s, &expected); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_hour_utc_vs_shanghai() -> Result<()> { |
| // 1970-01-01 00:00:00 UTC |
| let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)])); |
| |
| // default (None) -> UTC |
| let out_utc = spark_hour(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?; |
| let expected_utc: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)])); |
| assert_eq!(&out_utc, &expected_utc); |
| |
| let out_cst = spark_hour(&[ |
| ColumnarValue::Array(ts.clone()), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))), |
| ])? |
| .into_array(1)?; |
| let expected_cst: ArrayRef = Arc::new(Int32Array::from(vec![Some(8)])); |
| assert_eq!(&out_cst, &expected_cst); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_hour_scalar_vs_array_and_explicit_utc() -> Result<()> { |
| // 1970-01-01 12:34:56 UTC |
| let ts_ms = ms(12, 34, 56); |
| |
| let out_scalar = spark_hour(&[ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( |
| Some(ts_ms), |
| arc_tz("UTC"), |
| ))])? |
| .into_array(1)?; |
| |
| // Array + default (None = UTC) |
| let arr = Arc::new(TimestampMillisecondArray::from(vec![Some(ts_ms)])); |
| let out_array = spark_hour(&[ColumnarValue::Array(arr)])?.into_array(1)?; |
| |
| let expected: ArrayRef = Arc::new(Int32Array::from(vec![Some(12)])); |
| assert_eq!(&out_scalar, &expected); |
| assert_eq!(&out_array, &expected); |
| Ok(()) |
| } |
| |
| /// Helper: build epoch ms from a UTC calendar time. |
| fn utc_ms(y: i32, mo: u32, d: u32, h: u32, m: u32, s: u32) -> i64 { |
| // chrono 0.4: with_ymd_and_hms(...).single() for strictness |
| Utc.with_ymd_and_hms(y, mo, d, h, m, s) |
| .single() |
| .expect("valid UTC datetime") |
| .timestamp_millis() |
| } |
| |
| #[test] |
| fn test_minute_second_utc_vs_shanghai_and_kolkata() -> Result<()> { |
| // epoch 0 -> 1970-01-01 00:00:00 UTC |
| let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)])); |
| |
| // Default (None) -> UTC |
| let out_min_utc = spark_minute(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?; |
| let out_sec_utc = spark_second(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?; |
| |
| let expected_min_utc: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)])); |
| let expected_sec_utc: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)])); |
| assert_eq!(&out_min_utc, &expected_min_utc); |
| assert_eq!(&out_sec_utc, &expected_sec_utc); |
| |
| // Asia/Shanghai (+08:00) -> 08:00:00 local => minute=0, second=0 |
| let out_min_cst = spark_minute(&[ |
| ColumnarValue::Array(ts.clone()), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))), |
| ])? |
| .into_array(1)?; |
| let out_sec_cst = spark_second(&[ |
| ColumnarValue::Array(ts.clone()), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))), |
| ])? |
| .into_array(1)?; |
| let expected_min_cst: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)])); |
| let expected_sec_cst: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)])); |
| assert_eq!(&out_min_cst, &expected_min_cst); |
| assert_eq!(&out_sec_cst, &expected_sec_cst); |
| |
| // Asia/Kolkata (+05:30) -> 05:30:00 local => minute=30, second=0 |
| let out_min_kol = spark_minute(&[ |
| ColumnarValue::Array(ts.clone()), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kolkata".to_string()))), |
| ])? |
| .into_array(1)?; |
| let out_sec_kol = spark_second(&[ |
| ColumnarValue::Array(ts), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kolkata".to_string()))), |
| ])? |
| .into_array(1)?; |
| let expected_min_kol: ArrayRef = Arc::new(Int32Array::from(vec![Some(30)])); |
| let expected_sec_kol: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)])); |
| assert_eq!(&out_min_kol, &expected_min_kol); |
| assert_eq!(&out_sec_kol, &expected_sec_kol); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_minute_second_nonwhole_offset_kathmandu() -> Result<()> { |
| // 1970-01-01 00:00:00 UTC -> Asia/Kathmandu was +05:30 |
| let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)])); |
| |
| let out_min = spark_minute(&[ |
| ColumnarValue::Array(ts.clone()), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kathmandu".to_string()))), |
| ])? |
| .into_array(1)?; |
| let out_sec = spark_second(&[ |
| ColumnarValue::Array(ts), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kathmandu".to_string()))), |
| ])? |
| .into_array(1)?; |
| |
| let expected_min: ArrayRef = Arc::new(Int32Array::from(vec![Some(30)])); |
| let expected_sec: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)])); |
| assert_eq!(&out_min, &expected_min); |
| assert_eq!(&out_sec, &expected_sec); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_minute_second_dst_spring_forward_newyork() -> Result<()> { |
| // America/New_York DST starts on 2019-03-10. |
| // Local time jumps from 01:59:59 to 03:00:00 (02:00:00 - 02:59:59 does not |
| // exist). |
| |
| // 2019-03-10 06:59:59 UTC -> 01:59:59 local (EST, UTC-5) |
| let t1 = utc_ms(2019, 3, 10, 6, 59, 59); |
| // 2019-03-10 07:00:00 UTC -> 03:00:00 local (EDT, UTC-4) |
| let t2 = utc_ms(2019, 3, 10, 7, 0, 0); |
| |
| let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(t1), Some(t2)])); |
| |
| let out_min = spark_minute(&[ |
| ColumnarValue::Array(ts.clone()), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("America/New_York".to_string()))), |
| ])? |
| .into_array(2)?; |
| let out_sec = spark_second(&[ |
| ColumnarValue::Array(ts.clone()), |
| ColumnarValue::Scalar(ScalarValue::Utf8(Some("America/New_York".to_string()))), |
| ])? |
| .into_array(2)?; |
| |
| let expected_min: ArrayRef = Arc::new(Int32Array::from(vec![Some(59), Some(0)])); |
| let expected_sec: ArrayRef = Arc::new(Int32Array::from(vec![Some(59), Some(0)])); |
| |
| assert_eq!(&out_min, &expected_min); |
| assert_eq!(&out_sec, &expected_sec); |
| |
| Ok(()) |
| } |
| |
| fn months_between_args( |
| timestamp1_ms: Option<i64>, |
| timestamp2_ms: Option<i64>, |
| round_off: Option<bool>, |
| timezone: Option<&str>, |
| ) -> [ColumnarValue; 4] { |
| [ |
| ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(timestamp1_ms, None)), |
| ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(timestamp2_ms, None)), |
| ColumnarValue::Scalar(ScalarValue::Boolean(round_off)), |
| ColumnarValue::Scalar(ScalarValue::Utf8(timezone.map(str::to_string))), |
| ] |
| } |
| |
| #[test] |
| fn test_spark_months_between_ignores_time_for_same_day() -> Result<()> { |
| let out = spark_months_between(&months_between_args( |
| Some(utc_ms(2024, 3, 15, 23, 59, 59)), |
| Some(utc_ms(2024, 1, 15, 0, 0, 0)), |
| Some(true), |
| Some("UTC"), |
| ))? |
| .into_array(1)?; |
| |
| let expected: ArrayRef = Arc::new(Float64Array::from(vec![Some(2.0)])); |
| assert_eq!(&out, &expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_months_between_ignores_time_for_last_day_of_month() -> Result<()> { |
| let out = spark_months_between(&months_between_args( |
| Some(utc_ms(2024, 2, 29, 12, 0, 0)), |
| Some(utc_ms(2024, 1, 31, 0, 0, 0)), |
| Some(true), |
| Some("UTC"), |
| ))? |
| .into_array(1)?; |
| |
| let expected: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.0)])); |
| assert_eq!(&out, &expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_months_between_fractional_rounding() -> Result<()> { |
| let rounded = spark_months_between(&months_between_args( |
| Some(utc_ms(2024, 3, 2, 12, 0, 0)), |
| Some(utc_ms(2024, 1, 1, 0, 0, 0)), |
| Some(true), |
| Some("UTC"), |
| ))? |
| .into_array(1)?; |
| let expected_rounded: ArrayRef = Arc::new(Float64Array::from(vec![Some(2.0483871)])); |
| assert_eq!(&rounded, &expected_rounded); |
| |
| let unrounded = spark_months_between(&months_between_args( |
| Some(utc_ms(2024, 3, 2, 12, 0, 0)), |
| Some(utc_ms(2024, 1, 1, 0, 0, 0)), |
| Some(false), |
| Some("UTC"), |
| ))? |
| .into_array(1)?; |
| let unrounded = unrounded |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .expect("months_between should return Float64Array"); |
| assert!((unrounded.value(0) - 2.0483870967741935).abs() < 1e-12); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_months_between_respects_dst_gap_in_session_timezone() -> Result<()> { |
| let out = spark_months_between(&months_between_args( |
| // 2024-03-10 07:30:00 UTC -> 2024-03-10 03:30:00 local in America/New_York. |
| // The 02:00-02:59 hour does not exist on this day due to spring-forward DST. |
| Some(utc_ms(2024, 3, 10, 7, 30, 0)), |
| // 2024-02-09 06:30:00 UTC -> 2024-02-09 01:30:00 local. |
| Some(utc_ms(2024, 2, 9, 6, 30, 0)), |
| Some(false), |
| Some("America/New_York"), |
| ))? |
| .into_array(1)?; |
| |
| let out = out |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .expect("months_between should return Float64Array"); |
| assert!((out.value(0) - 1.0336021505376345).abs() < 1e-12); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_months_between_negative_when_timestamp1_is_earlier() -> Result<()> { |
| let out = spark_months_between(&months_between_args( |
| Some(utc_ms(2024, 1, 15, 0, 0, 0)), |
| Some(utc_ms(2024, 3, 15, 23, 59, 59)), |
| Some(true), |
| Some("UTC"), |
| ))? |
| .into_array(1)?; |
| |
| let expected: ArrayRef = Arc::new(Float64Array::from(vec![Some(-2.0)])); |
| assert_eq!(&out, &expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_months_between_fractional_rounding_keeps_negative_sign() -> Result<()> { |
| let out = spark_months_between(&months_between_args( |
| Some(utc_ms(2024, 1, 1, 0, 0, 0)), |
| Some(utc_ms(2024, 3, 2, 12, 0, 0)), |
| Some(true), |
| Some("UTC"), |
| ))? |
| .into_array(1)?; |
| |
| let expected: ArrayRef = Arc::new(Float64Array::from(vec![Some(-2.0483871)])); |
| assert_eq!(&out, &expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_spark_months_between_null_propagation() -> Result<()> { |
| let out = spark_months_between(&months_between_args( |
| None, |
| Some(utc_ms(2024, 1, 1, 0, 0, 0)), |
| Some(true), |
| Some("UTC"), |
| ))? |
| .into_array(1)?; |
| |
| let expected: ArrayRef = Arc::new(Float64Array::from(vec![None])); |
| assert_eq!(&out, &expected); |
| Ok(()) |
| } |
| } |