blob: 3d363ce97d21617ebfcf6b598d6ab091cbdef428 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! DateTime expressions
use std::sync::Arc;
use super::ColumnarValue;
use crate::{
error::{DataFusionError, Result},
scalar::{ScalarType, ScalarValue},
};
use arrow::{
array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
};
use arrow::{
array::{
Date32Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
},
compute::kernels::temporal,
datatypes::TimeUnit,
temporal_conversions::timestamp_ns_to_datetime,
};
use chrono::prelude::*;
use chrono::Duration;
use chrono::LocalResult;
#[inline]
/// Accepts a string in RFC3339 / ISO8601 standard format and some
/// variants and converts it to a nanosecond precision timestamp.
///
/// Implements the `to_timestamp` function to convert a string to a
/// timestamp, following the model of spark SQL’s to_`timestamp`.
///
/// In addition to RFC3339 / ISO8601 standard timestamps, it also
/// accepts strings that use a space ` ` to separate the date and time
/// as well as strings that have no explicit timezone offset.
///
/// Examples of accepted inputs:
/// * `1997-01-31T09:26:56.123Z` # RCF3339
/// * `1997-01-31T09:26:56.123-05:00` # RCF3339
/// * `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space rather than T
/// * `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone offset specified
/// * `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and no timezone offset
/// * `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
//
/// Internally, this function uses the `chrono` library for the
/// datetime parsing
///
/// We hope to extend this function in the future with a second
/// parameter to specifying the format string.
///
/// ## Timestamp Precision
///
/// DataFusion uses the maximum precision timestamps supported by
/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This
/// means the range of dates that timestamps can represent is ~1677 AD
/// to 2262 AM
///
///
/// ## Timezone / Offset Handling
///
/// By using the Arrow format, DataFusion inherits Arrow’s handling of
/// timestamp values. Specifically, the stored numerical values of
/// timestamps are stored compared to offset UTC.
///
/// This function intertprets strings without an explicit time zone as
/// timestamps with offsets of the local time on the machine that ran
/// the datafusion query
///
/// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as
/// it has an explicit timezone specifier (“Z” for Zulu/UTC)
///
/// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in
/// the timezone of the machine that ran DataFusion. For example, if
/// the system timezone is set to Americas/New_York (UTC-5) the
/// timestamp will be interpreted as though it were
/// `1997-01-31T09:26:56.123-05:00`
fn string_to_timestamp_nanos(s: &str) -> Result<i64> {
// Fast path: RFC3339 timestamp (with a T)
// Example: 2020-09-08T13:42:29.190855Z
if let Ok(ts) = DateTime::parse_from_rfc3339(s) {
return Ok(ts.timestamp_nanos());
}
// Implement quasi-RFC3339 support by trying to parse the
// timestamp with various other format specifiers to to support
// separating the date and time with a space ' ' rather than 'T' to be
// (more) compatible with Apache Spark SQL
// timezone offset, using ' ' as a separator
// Example: 2020-09-08 13:42:29.190855-05:00
if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") {
return Ok(ts.timestamp_nanos());
}
// with an explicit Z, using ' ' as a separator
// Example: 2020-09-08 13:42:29Z
if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") {
return Ok(ts.timestamp_nanos());
}
// Support timestamps without an explicit timezone offset, again
// to be compatible with what Apache Spark SQL does.
// without a timezone specifier as a local time, using T as a separator
// Example: 2020-09-08T13:42:29.190855
if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") {
return naive_datetime_to_timestamp(s, ts);
}
// without a timezone specifier as a local time, using T as a
// separator, no fractional seconds
// Example: 2020-09-08T13:42:29
if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
return naive_datetime_to_timestamp(s, ts);
}
// without a timezone specifier as a local time, using ' ' as a separator
// Example: 2020-09-08 13:42:29.190855
if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") {
return naive_datetime_to_timestamp(s, ts);
}
// without a timezone specifier as a local time, using ' ' as a
// separator, no fractional seconds
// Example: 2020-09-08 13:42:29
if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
return naive_datetime_to_timestamp(s, ts);
}
// Note we don't pass along the error message from the underlying
// chrono parsing because we tried several different format
// strings and we don't know which the user was trying to
// match. Ths any of the specific error messages is likely to be
// be more confusing than helpful
Err(DataFusionError::Execution(format!(
"Error parsing '{}' as timestamp",
s
)))
}
/// Converts the naive datetime (which has no specific timezone) to a
/// nanosecond epoch timestamp relative to UTC.
fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result<i64> {
let l = Local {};
match l.from_local_datetime(&datetime) {
LocalResult::None => Err(DataFusionError::Execution(format!(
"Error parsing '{}' as timestamp: local time representation is invalid",
s
))),
LocalResult::Single(local_datetime) => {
Ok(local_datetime.with_timezone(&Utc).timestamp_nanos())
}
// Ambiguous times can happen if the timestamp is exactly when
// a daylight savings time transition occurs, for example, and
// so the datetime could validly be said to be in two
// potential offsets. However, since we are about to convert
// to UTC anyways, we can pick one arbitrarily
LocalResult::Ambiguous(local_datetime, _) => {
Ok(local_datetime.with_timezone(&Utc).timestamp_nanos())
}
}
}
// given a function `op` that maps a `&str` to a Result of an arrow native type,
// returns a `PrimitiveArray` after the application
// of the function to `args[0]`.
/// # Errors
/// This function errors iff:
/// * the number of arguments is not 1 or
/// * the first argument is not castable to a `GenericStringArray` or
/// * the function `op` errors
pub(crate) fn unary_string_to_primitive_function<'a, T, O, F>(
args: &[&'a dyn Array],
op: F,
name: &str,
) -> Result<PrimitiveArray<O>>
where
O: ArrowPrimitiveType,
T: StringOffsetSizeTrait,
F: Fn(&'a str) -> Result<O::Native>,
{
if args.len() != 1 {
return Err(DataFusionError::Internal(format!(
"{:?} args were supplied but {} takes exactly one argument",
args.len(),
name,
)));
}
let array = args[0]
.as_any()
.downcast_ref::<GenericStringArray<T>>()
.ok_or_else(|| {
DataFusionError::Internal("failed to downcast to string".to_string())
})?;
// first map is the iterator, second is for the `Option<_>`
array.iter().map(|x| x.map(|x| op(x)).transpose()).collect()
}
// given an function that maps a `&str` to a arrow native type,
// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
// depending on the `args`'s variant.
fn handle<'a, O, F, S>(
args: &'a [ColumnarValue],
op: F,
name: &str,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
F: Fn(&'a str) -> Result<O::Native>,
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<i32, O, _>(&[a.as_ref()], op, name)?,
))),
DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<i64, O, _>(&[a.as_ref()], op, name)?,
))),
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for function {}",
other, name,
))),
},
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::Utf8(a) => {
let result = a.as_ref().map(|x| (op)(x)).transpose()?;
Ok(ColumnarValue::Scalar(S::scalar(result)))
}
ScalarValue::LargeUtf8(a) => {
let result = a.as_ref().map(|x| (op)(x)).transpose()?;
Ok(ColumnarValue::Scalar(S::scalar(result)))
}
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for function {}",
other, name
))),
},
}
}
/// to_timestamp SQL function
pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
handle::<TimestampNanosecondType, _, TimestampNanosecondType>(
args,
string_to_timestamp_nanos,
"to_timestamp",
)
}
fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
let value = timestamp_ns_to_datetime(value).with_nanosecond(0);
let value = match granularity {
"second" => value,
"minute" => value.and_then(|d| d.with_second(0)),
"hour" => value
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0)),
"day" => value
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0)),
"week" => value
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.map(|d| d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64)),
"month" => value
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0)),
"year" => value
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0))
.and_then(|d| d.with_month0(0)),
unsupported => {
return Err(DataFusionError::Execution(format!(
"Unsupported date_trunc granularity: {}",
unsupported
)))
}
};
// `with_x(0)` are infalible because `0` are always a valid
Ok(value.unwrap().timestamp_nanos())
}
/// date_trunc SQL function
pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let (granularity, array) = (&args[0], &args[1]);
let granularity =
if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = granularity {
v
} else {
return Err(DataFusionError::Execution(
"Granularity of `date_trunc` must be non-null scalar Utf8".to_string(),
));
};
let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
Ok(match array {
ColumnarValue::Scalar(scalar) => {
if let ScalarValue::TimeNanosecond(v) = scalar {
ColumnarValue::Scalar(ScalarValue::TimeNanosecond((f)(*v)?))
} else {
return Err(DataFusionError::Execution(
"array of `date_trunc` must be non-null scalar Utf8".to_string(),
));
}
}
ColumnarValue::Array(array) => {
let array = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let array = array
.iter()
.map(f)
.collect::<Result<TimestampNanosecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
}
})
}
macro_rules! extract_date_part {
($ARRAY: expr, $FN:expr) => {
match $ARRAY.data_type() {
DataType::Date32 => {
let array = $ARRAY.as_any().downcast_ref::<Date32Array>().unwrap();
Ok($FN(array)?)
}
DataType::Date64 => {
let array = $ARRAY.as_any().downcast_ref::<Date64Array>().unwrap();
Ok($FN(array)?)
}
DataType::Timestamp(time_unit, None) => match time_unit {
TimeUnit::Second => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap();
Ok($FN(array)?)
}
TimeUnit::Millisecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
Ok($FN(array)?)
}
TimeUnit::Microsecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
Ok($FN(array)?)
}
TimeUnit::Nanosecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
Ok($FN(array)?)
}
},
datatype => Err(DataFusionError::Internal(format!(
"Extract does not support datatype {:?}",
datatype
))),
}
};
}
/// DATE_PART SQL function
pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return Err(DataFusionError::Execution(
"Expected two arguments in DATE_PART".to_string(),
));
}
let (date_part, array) = (&args[0], &args[1]);
let date_part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = date_part {
v
} else {
return Err(DataFusionError::Execution(
"First argument of `DATE_PART` must be non-null scalar Utf8".to_string(),
));
};
let is_scalar = matches!(array, ColumnarValue::Scalar(_));
let array = match array {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
};
let arr = match date_part.to_lowercase().as_str() {
"hour" => extract_date_part!(array, temporal::hour),
"year" => extract_date_part!(array, temporal::year),
_ => Err(DataFusionError::Execution(format!(
"Date part '{}' not supported",
date_part
))),
}?;
Ok(if is_scalar {
ColumnarValue::Scalar(ScalarValue::try_from_array(
&(Arc::new(arr) as ArrayRef),
0,
)?)
} else {
ColumnarValue::Array(Arc::new(arr))
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::{ArrayRef, Int64Array, StringBuilder};
use super::*;
#[test]
fn to_timestamp_arrays_and_nulls() -> Result<()> {
// ensure that arrow array implementation is wired up and handles nulls correctly
let mut string_builder = StringBuilder::new(2);
let mut ts_builder = TimestampNanosecondArray::builder(2);
string_builder.append_value("2020-09-08T13:42:29.190855Z")?;
ts_builder.append_value(1599572549190855000)?;
string_builder.append_null()?;
ts_builder.append_null()?;
let expected_timestamps = &ts_builder.finish() as &dyn Array;
let string_array =
ColumnarValue::Array(Arc::new(string_builder.finish()) as ArrayRef);
let parsed_timestamps = to_timestamp(&[string_array])
.expect("that to_timestamp parsed values without error");
if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
assert_eq!(parsed_array.len(), 2);
assert_eq!(expected_timestamps, parsed_array.as_ref());
} else {
panic!("Expected a columnar array")
}
Ok(())
}
#[test]
fn date_trunc_test() {
let cases = vec![
(
"2020-09-08T13:42:29.190855Z",
"second",
"2020-09-08T13:42:29.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"minute",
"2020-09-08T13:42:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"hour",
"2020-09-08T13:00:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"day",
"2020-09-08T00:00:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"week",
"2020-09-07T00:00:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"month",
"2020-09-01T00:00:00.000000Z",
),
(
"2020-09-08T13:42:29.190855Z",
"year",
"2020-01-01T00:00:00.000000Z",
),
(
"2021-01-01T13:42:29.190855Z",
"week",
"2020-12-28T00:00:00.000000Z",
),
(
"2020-01-01T13:42:29.190855Z",
"week",
"2019-12-30T00:00:00.000000Z",
),
];
cases.iter().for_each(|(original, granularity, expected)| {
let original = string_to_timestamp_nanos(original).unwrap();
let expected = string_to_timestamp_nanos(expected).unwrap();
let result = date_trunc_single(granularity, original).unwrap();
assert_eq!(result, expected);
});
}
#[test]
fn to_timestamp_invalid_input_type() -> Result<()> {
// pass the wrong type of input array to to_timestamp and test
// that we get an error.
let mut builder = Int64Array::builder(1);
builder.append_value(1)?;
let int64array = ColumnarValue::Array(Arc::new(builder.finish()));
let expected_err =
"Internal error: Unsupported data type Int64 for function to_timestamp";
match to_timestamp(&[int64array]) {
Ok(_) => panic!("Expected error but got success"),
Err(e) => {
assert!(
e.to_string().contains(expected_err),
"Can not find expected error '{}'. Actual error '{}'",
expected_err,
e
);
}
}
Ok(())
}
}