| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! [`ScalarUDFImpl`] definitions for array_element, array_slice, array_pop_front, array_pop_back, and array_any_value functions. |
| |
| use arrow::array::{ |
| Array, ArrayRef, Capacities, GenericListArray, GenericListViewArray, Int64Array, |
| MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait, |
| }; |
| use arrow::buffer::{OffsetBuffer, ScalarBuffer}; |
| use arrow::datatypes::DataType; |
| use arrow::datatypes::{ |
| DataType::{FixedSizeList, LargeList, LargeListView, List, ListView, Null}, |
| Field, |
| }; |
| use datafusion_common::cast::as_large_list_array; |
| use datafusion_common::cast::as_list_array; |
| use datafusion_common::cast::{ |
| as_int64_array, as_large_list_view_array, as_list_view_array, |
| }; |
| use datafusion_common::internal_err; |
| use datafusion_common::utils::ListCoercion; |
| use datafusion_common::{ |
| Result, exec_datafusion_err, exec_err, internal_datafusion_err, plan_err, |
| utils::take_function_args, |
| }; |
| use datafusion_expr::{ |
| ArrayFunctionArgument, ArrayFunctionSignature, Expr, ScalarFunctionArgs, |
| TypeSignature, |
| }; |
| use datafusion_expr::{ |
| ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, |
| }; |
| use datafusion_macros::user_doc; |
| use std::sync::Arc; |
| |
| use crate::utils::make_scalar_function; |
| |
| // Create static instances of ScalarUDFs for each function |
| make_udf_expr_and_func!( |
| ArrayElement, |
| array_element, |
| array element, |
| "extracts the element with the index n from the array.", |
| array_element_udf |
| ); |
| |
| create_func!(ArraySlice, array_slice_udf); |
| |
| make_udf_expr_and_func!( |
| ArrayPopFront, |
| array_pop_front, |
| array, |
| "returns the array without the first element.", |
| array_pop_front_udf |
| ); |
| |
| make_udf_expr_and_func!( |
| ArrayPopBack, |
| array_pop_back, |
| array, |
| "returns the array without the last element.", |
| array_pop_back_udf |
| ); |
| |
| make_udf_expr_and_func!( |
| ArrayAnyValue, |
| array_any_value, |
| array, |
| "returns the first non-null element in the array.", |
| array_any_value_udf |
| ); |
| |
| #[user_doc( |
| doc_section(label = "Array Functions"), |
| description = "Extracts the element with the index n from the array.", |
| syntax_example = "array_element(array, index)", |
| sql_example = r#"```sql |
| > select array_element([1, 2, 3, 4], 3); |
| +-----------------------------------------+ |
| | array_element(List([1,2,3,4]),Int64(3)) | |
| +-----------------------------------------+ |
| | 3 | |
| +-----------------------------------------+ |
| ```"#, |
| argument( |
| name = "array", |
| description = "Array expression. Can be a constant, column, or function, and any combination of array operators." |
| ), |
| argument( |
| name = "index", |
| description = "Index to extract the element from the array." |
| ) |
| )] |
| #[derive(Debug, PartialEq, Eq, Hash)] |
| pub struct ArrayElement { |
| signature: Signature, |
| aliases: Vec<String>, |
| } |
| |
| impl Default for ArrayElement { |
| fn default() -> Self { |
| Self::new() |
| } |
| } |
| |
| impl ArrayElement { |
| pub fn new() -> Self { |
| Self { |
| signature: Signature::array_and_index(Volatility::Immutable), |
| aliases: vec![ |
| String::from("array_extract"), |
| String::from("list_element"), |
| String::from("list_extract"), |
| ], |
| } |
| } |
| } |
| |
| impl ScalarUDFImpl for ArrayElement { |
| fn name(&self) -> &str { |
| "array_element" |
| } |
| |
| fn display_name(&self, args: &[Expr]) -> Result<String> { |
| let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>(); |
| if args_name.len() != 2 { |
| return exec_err!("expect 2 args, got {}", args_name.len()); |
| } |
| |
| Ok(format!("{}[{}]", args_name[0], args_name[1])) |
| } |
| |
| fn schema_name(&self, args: &[Expr]) -> Result<String> { |
| let args_name = args |
| .iter() |
| .map(|e| e.schema_name().to_string()) |
| .collect::<Vec<_>>(); |
| if args_name.len() != 2 { |
| return exec_err!("expect 2 args, got {}", args_name.len()); |
| } |
| |
| Ok(format!("{}[{}]", args_name[0], args_name[1])) |
| } |
| |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| |
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { |
| match &arg_types[0] { |
| Null => Ok(Null), |
| List(field) | LargeList(field) => Ok(field.data_type().clone()), |
| arg_type => plan_err!("{} does not support type {arg_type}", self.name()), |
| } |
| } |
| |
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { |
| make_scalar_function(array_element_inner)(&args.args) |
| } |
| |
| fn aliases(&self) -> &[String] { |
| &self.aliases |
| } |
| |
| fn documentation(&self) -> Option<&Documentation> { |
| self.doc() |
| } |
| } |
| |
| /// array_element SQL function |
| /// |
| /// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index. |
| /// `array_element(array, index)` |
| /// |
| /// For example: |
| /// > array_element(\[1, 2, 3], 2) -> 2 |
| fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> { |
| let [array, indexes] = take_function_args("array_element", args)?; |
| |
| match &array.data_type() { |
| Null => Ok(Arc::new(NullArray::new(array.len()))), |
| List(_) => { |
| let array = as_list_array(&array)?; |
| let indexes = as_int64_array(&indexes)?; |
| general_array_element::<i32>(array, indexes) |
| } |
| LargeList(_) => { |
| let array = as_large_list_array(&array)?; |
| let indexes = as_int64_array(&indexes)?; |
| general_array_element::<i64>(array, indexes) |
| } |
| arg_type => { |
| exec_err!("array_element does not support type {arg_type}") |
| } |
| } |
| } |
| |
| fn general_array_element<O: OffsetSizeTrait>( |
| array: &GenericListArray<O>, |
| indexes: &Int64Array, |
| ) -> Result<ArrayRef> |
| where |
| i64: TryInto<O>, |
| { |
| let values = array.values(); |
| if values.data_type().is_null() { |
| return Ok(Arc::new(NullArray::new(array.len()))); |
| } |
| |
| let original_data = values.to_data(); |
| let capacity = Capacities::Array(original_data.len()); |
| |
| // use_nulls: true, we don't construct List for array_element, so we need explicit nulls. |
| let mut mutable = |
| MutableArrayData::with_capacities(vec![&original_data], true, capacity); |
| |
| fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>> |
| where |
| i64: TryInto<O>, |
| { |
| let index: O = index.try_into().map_err(|_| { |
| exec_datafusion_err!("array_element got invalid index: {index}") |
| })?; |
| // 0 ~ len - 1 |
| let adjusted_zero_index = if index < O::usize_as(0) { |
| index + len |
| } else { |
| index - O::usize_as(1) |
| }; |
| |
| if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len { |
| Ok(Some(adjusted_zero_index)) |
| } else { |
| // Out of bounds |
| Ok(None) |
| } |
| } |
| |
| for (row_index, offset_window) in array.offsets().windows(2).enumerate() { |
| let start = offset_window[0]; |
| let end = offset_window[1]; |
| let len = end - start; |
| |
| // array is null |
| if len == O::usize_as(0) { |
| mutable.extend_nulls(1); |
| continue; |
| } |
| |
| let index = adjusted_array_index::<O>(indexes.value(row_index), len)?; |
| |
| if let Some(index) = index { |
| let start = start.as_usize() + index.as_usize(); |
| mutable.extend(0, start, start + 1_usize); |
| } else { |
| // Index out of bounds |
| mutable.extend_nulls(1); |
| } |
| } |
| |
| let data = mutable.freeze(); |
| Ok(arrow::array::make_array(data)) |
| } |
| |
| #[doc = "returns a slice of the array."] |
| pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr { |
| let args = match stride { |
| Some(stride) => vec![array, begin, end, stride], |
| None => vec![array, begin, end], |
| }; |
| array_slice_udf().call(args) |
| } |
| |
| #[user_doc( |
| doc_section(label = "Array Functions"), |
| description = "Returns a slice of the array based on 1-indexed start and end positions.", |
| syntax_example = "array_slice(array, begin, end)", |
| sql_example = r#"```sql |
| > select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6); |
| +--------------------------------------------------------+ |
| | array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) | |
| +--------------------------------------------------------+ |
| | [3, 4, 5, 6] | |
| +--------------------------------------------------------+ |
| ```"#, |
| argument( |
| name = "array", |
| description = "Array expression. Can be a constant, column, or function, and any combination of array operators." |
| ), |
| argument( |
| name = "begin", |
| description = "Index of the first element. If negative, it counts backward from the end of the array." |
| ), |
| argument( |
| name = "end", |
| description = "Index of the last element. If negative, it counts backward from the end of the array." |
| ), |
| argument( |
| name = "stride", |
| description = "Stride of the array slice. The default is 1." |
| ) |
| )] |
| #[derive(Debug, PartialEq, Eq, Hash)] |
| pub(super) struct ArraySlice { |
| signature: Signature, |
| aliases: Vec<String>, |
| } |
| |
| impl ArraySlice { |
| pub fn new() -> Self { |
| Self { |
| signature: Signature::one_of( |
| vec![ |
| TypeSignature::ArraySignature(ArrayFunctionSignature::Array { |
| arguments: vec![ |
| ArrayFunctionArgument::Array, |
| ArrayFunctionArgument::Index, |
| ArrayFunctionArgument::Index, |
| ], |
| array_coercion: Some(ListCoercion::FixedSizedListToList), |
| }), |
| TypeSignature::ArraySignature(ArrayFunctionSignature::Array { |
| arguments: vec![ |
| ArrayFunctionArgument::Array, |
| ArrayFunctionArgument::Index, |
| ArrayFunctionArgument::Index, |
| ArrayFunctionArgument::Index, |
| ], |
| array_coercion: Some(ListCoercion::FixedSizedListToList), |
| }), |
| ], |
| Volatility::Immutable, |
| ), |
| aliases: vec![String::from("list_slice")], |
| } |
| } |
| } |
| |
| impl ScalarUDFImpl for ArraySlice { |
| fn display_name(&self, args: &[Expr]) -> Result<String> { |
| let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>(); |
| if let Some((arr, indexes)) = args_name.split_first() { |
| Ok(format!("{arr}[{}]", indexes.join(":"))) |
| } else { |
| exec_err!("no argument") |
| } |
| } |
| |
| fn schema_name(&self, args: &[Expr]) -> Result<String> { |
| let args_name = args |
| .iter() |
| .map(|e| e.schema_name().to_string()) |
| .collect::<Vec<_>>(); |
| if let Some((arr, indexes)) = args_name.split_first() { |
| Ok(format!("{arr}[{}]", indexes.join(":"))) |
| } else { |
| exec_err!("no argument") |
| } |
| } |
| |
| fn name(&self) -> &str { |
| "array_slice" |
| } |
| |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| |
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { |
| Ok(arg_types[0].clone()) |
| } |
| |
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { |
| make_scalar_function(array_slice_inner)(&args.args) |
| } |
| |
| fn aliases(&self) -> &[String] { |
| &self.aliases |
| } |
| |
| fn documentation(&self) -> Option<&Documentation> { |
| self.doc() |
| } |
| } |
| |
| /// array_slice SQL function |
| /// |
| /// We follow the behavior of array_slice in DuckDB |
| /// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice. |
| /// |
| /// > array_slice(array, from, to) |
| /// |
| /// Positive index is treated as the index from the start of the array. If the |
| /// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the |
| /// length of the array, it is treated as the length of the array. |
| /// |
| /// Negative index is treated as the index from the end of the array. If the index |
| /// is larger than the length of the array, it is NOT VALID, either in `from` or `to`. |
| /// The `to` index is exclusive like python slice syntax. |
| /// |
| /// See test cases in `array.slt` for more details. |
| fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> { |
| let args_len = args.len(); |
| if args_len != 3 && args_len != 4 { |
| return exec_err!("array_slice needs three or four arguments"); |
| } |
| |
| let stride = if args_len == 4 { |
| Some(as_int64_array(&args[3])?) |
| } else { |
| None |
| }; |
| |
| let from_array = as_int64_array(&args[1])?; |
| let to_array = as_int64_array(&args[2])?; |
| |
| let array_data_type = args[0].data_type(); |
| match array_data_type { |
| List(_) => { |
| let array = as_list_array(&args[0])?; |
| general_array_slice::<i32>(array, from_array, to_array, stride) |
| } |
| LargeList(_) => { |
| let array = as_large_list_array(&args[0])?; |
| general_array_slice::<i64>(array, from_array, to_array, stride) |
| } |
| ListView(_) => { |
| let array = as_list_view_array(&args[0])?; |
| general_list_view_array_slice::<i32>(array, from_array, to_array, stride) |
| } |
| LargeListView(_) => { |
| let array = as_large_list_view_array(&args[0])?; |
| general_list_view_array_slice::<i64>(array, from_array, to_array, stride) |
| } |
| _ => exec_err!("array_slice does not support type: {}", array_data_type), |
| } |
| } |
| |
| fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>> |
| where |
| i64: TryInto<O>, |
| { |
| // 0 ~ len - 1 |
| let adjusted_zero_index = if index < 0 { |
| if let Ok(index) = index.try_into() { |
| // When index < 0 and -index > length, index is clamped to the beginning of the list. |
| // Otherwise, when index < 0, the index is counted from the end of the list. |
| // |
| // Note, we actually test the contrapositive, index < -length, because negating a |
| // negative will panic if the negative is equal to the smallest representable value |
| // while negating a positive is always safe. |
| if index < (O::zero() - O::one()) * len { |
| O::zero() |
| } else { |
| index + len |
| } |
| } else { |
| return exec_err!("array_slice got invalid index: {}", index); |
| } |
| } else { |
| // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to) |
| if let Ok(index) = index.try_into() { |
| std::cmp::max(index - O::usize_as(1), O::usize_as(0)) |
| } else { |
| return exec_err!("array_slice got invalid index: {}", index); |
| } |
| }; |
| |
| if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len { |
| Ok(Some(adjusted_zero_index)) |
| } else { |
| // Out of bounds |
| Ok(None) |
| } |
| } |
| |
| fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>> |
| where |
| i64: TryInto<O>, |
| { |
| // 0 ~ len - 1 |
| let adjusted_zero_index = if index < 0 { |
| // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive |
| if let Ok(index) = index.try_into() { |
| index + len |
| } else { |
| return exec_err!("array_slice got invalid index: {}", index); |
| } |
| } else { |
| // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len) |
| if let Ok(index) = index.try_into() { |
| std::cmp::min(index - O::usize_as(1), len - O::usize_as(1)) |
| } else { |
| return exec_err!("array_slice got invalid index: {}", index); |
| } |
| }; |
| |
| if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len { |
| Ok(Some(adjusted_zero_index)) |
| } else { |
| // Out of bounds |
| Ok(None) |
| } |
| } |
| |
| /// Internal plan describing how to materialize a single row's slice after |
| /// the slice bounds/stride have been normalized. Both list layouts consume |
| /// this to drive their copy logic. |
| enum SlicePlan<O: OffsetSizeTrait> { |
| /// No values should be produced. |
| Empty, |
| /// A contiguous run starting at `start` (relative to the row) with `len` |
| /// elements can be copied in one go. |
| Contiguous { start: O, len: O }, |
| /// Arbitrary positions (already relative to the row) must be copied in |
| /// sequence. |
| Indices(Vec<O>), |
| } |
| |
| /// Produces a [`SlicePlan`] for the given logical slice parameters. |
| fn compute_slice_plan<O: OffsetSizeTrait>( |
| len: O, |
| from_raw: i64, |
| to_raw: i64, |
| stride_raw: Option<i64>, |
| ) -> Result<SlicePlan<O>> |
| where |
| i64: TryInto<O>, |
| { |
| if len == O::usize_as(0) { |
| return Ok(SlicePlan::Empty); |
| } |
| |
| let from_index = adjusted_from_index::<O>(from_raw, len)?; |
| let to_index = adjusted_to_index::<O>(to_raw, len)?; |
| |
| let (Some(from), Some(to)) = (from_index, to_index) else { |
| return Ok(SlicePlan::Empty); |
| }; |
| |
| let stride_value = stride_raw.unwrap_or(1); |
| if stride_value == 0 { |
| return exec_err!( |
| "array_slice got invalid stride: {:?}, it cannot be 0", |
| stride_value |
| ); |
| } |
| |
| if (from < to && stride_value.is_negative()) |
| || (from > to && stride_value.is_positive()) |
| { |
| return Ok(SlicePlan::Empty); |
| } |
| |
| let stride: O = stride_value.try_into().map_err(|_| { |
| internal_datafusion_err!("array_slice got invalid stride: {}", stride_value) |
| })?; |
| |
| if from <= to && stride_value.is_positive() { |
| if stride_value == 1 { |
| let len = to - from + O::usize_as(1); |
| Ok(SlicePlan::Contiguous { start: from, len }) |
| } else { |
| let mut indices = Vec::new(); |
| let mut index = from; |
| while index <= to { |
| indices.push(index); |
| index += stride; |
| } |
| Ok(SlicePlan::Indices(indices)) |
| } |
| } else { |
| let mut indices = Vec::new(); |
| let mut index = from; |
| while index >= to { |
| indices.push(index); |
| index += stride; |
| } |
| Ok(SlicePlan::Indices(indices)) |
| } |
| } |
| |
| fn general_array_slice<O: OffsetSizeTrait>( |
| array: &GenericListArray<O>, |
| from_array: &Int64Array, |
| to_array: &Int64Array, |
| stride: Option<&Int64Array>, |
| ) -> Result<ArrayRef> |
| where |
| i64: TryInto<O>, |
| { |
| let values = array.values(); |
| let original_data = values.to_data(); |
| let capacity = Capacities::Array(original_data.len()); |
| |
| let mut mutable = |
| MutableArrayData::with_capacities(vec![&original_data], true, capacity); |
| |
| // We have the slice syntax compatible with DuckDB v0.8.1. |
| // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb. |
| |
| let mut offsets = vec![O::usize_as(0)]; |
| let mut null_builder = NullBufferBuilder::new(array.len()); |
| |
| for (row_index, offset_window) in array.offsets().windows(2).enumerate() { |
| let start = offset_window[0]; |
| let end = offset_window[1]; |
| let len = end - start; |
| |
| // If any input is null, return null. |
| if array.is_null(row_index) |
| || from_array.is_null(row_index) |
| || to_array.is_null(row_index) |
| || stride.is_some_and(|s| s.is_null(row_index)) |
| { |
| mutable.extend_nulls(1); |
| offsets.push(offsets[row_index] + O::usize_as(1)); |
| null_builder.append_null(); |
| continue; |
| } |
| null_builder.append_non_null(); |
| |
| // Empty arrays always return an empty array. |
| if len == O::usize_as(0) { |
| offsets.push(offsets[row_index]); |
| continue; |
| } |
| |
| let slice_plan = compute_slice_plan::<O>( |
| len, |
| from_array.value(row_index), |
| to_array.value(row_index), |
| stride.map(|s| s.value(row_index)), |
| )?; |
| |
| match slice_plan { |
| SlicePlan::Empty => offsets.push(offsets[row_index]), |
| SlicePlan::Contiguous { |
| start: rel_start, |
| len: slice_len, |
| } => { |
| let start_index = (start + rel_start).to_usize().unwrap(); |
| let end_index = (start + rel_start + slice_len).to_usize().unwrap(); |
| mutable.extend(0, start_index, end_index); |
| offsets.push(offsets[row_index] + slice_len); |
| } |
| SlicePlan::Indices(indices) => { |
| let count = indices.len(); |
| for rel_index in indices { |
| let absolute_index = (start + rel_index).to_usize().unwrap(); |
| mutable.extend(0, absolute_index, absolute_index + 1); |
| } |
| offsets.push(offsets[row_index] + O::usize_as(count)); |
| } |
| } |
| } |
| |
| let data = mutable.freeze(); |
| |
| Ok(Arc::new(GenericListArray::<O>::try_new( |
| Arc::new(Field::new_list_field(array.value_type(), true)), |
| OffsetBuffer::<O>::new(offsets.into()), |
| arrow::array::make_array(data), |
| null_builder.finish(), |
| )?)) |
| } |
| |
| fn general_list_view_array_slice<O: OffsetSizeTrait>( |
| array: &GenericListViewArray<O>, |
| from_array: &Int64Array, |
| to_array: &Int64Array, |
| stride: Option<&Int64Array>, |
| ) -> Result<ArrayRef> |
| where |
| i64: TryInto<O>, |
| { |
| let values = array.values(); |
| let original_data = values.to_data(); |
| let capacity = Capacities::Array(original_data.len()); |
| let field = match array.data_type() { |
| ListView(field) | LargeListView(field) => Arc::clone(field), |
| other => { |
| return internal_err!("array_slice got unexpected data type: {}", other); |
| } |
| }; |
| |
| let mut mutable = |
| MutableArrayData::with_capacities(vec![&original_data], true, capacity); |
| |
| // We must build `offsets` and `sizes` buffers manually as ListView does not enforce |
| // monotonically increasing offsets. |
| let mut offsets = Vec::with_capacity(array.len()); |
| let mut sizes = Vec::with_capacity(array.len()); |
| let mut current_offset = O::usize_as(0); |
| let mut null_builder = NullBufferBuilder::new(array.len()); |
| |
| for row_index in 0..array.len() { |
| // Propagate NULL semantics: any NULL input yields a NULL output slot. |
| if array.is_null(row_index) |
| || from_array.is_null(row_index) |
| || to_array.is_null(row_index) |
| || stride.is_some_and(|s| s.is_null(row_index)) |
| { |
| null_builder.append_null(); |
| offsets.push(current_offset); |
| sizes.push(O::usize_as(0)); |
| continue; |
| } |
| null_builder.append_non_null(); |
| |
| let len = array.value_size(row_index); |
| |
| // Empty arrays always return an empty array. |
| if len == O::usize_as(0) { |
| offsets.push(current_offset); |
| sizes.push(O::usize_as(0)); |
| continue; |
| } |
| |
| let slice_plan = compute_slice_plan::<O>( |
| len, |
| from_array.value(row_index), |
| to_array.value(row_index), |
| stride.map(|s| s.value(row_index)), |
| )?; |
| |
| let start = array.value_offset(row_index); |
| match slice_plan { |
| SlicePlan::Empty => { |
| offsets.push(current_offset); |
| sizes.push(O::usize_as(0)); |
| } |
| SlicePlan::Contiguous { |
| start: rel_start, |
| len: slice_len, |
| } => { |
| let start_index = (start + rel_start).to_usize().unwrap(); |
| let end_index = (start + rel_start + slice_len).to_usize().unwrap(); |
| mutable.extend(0, start_index, end_index); |
| offsets.push(current_offset); |
| sizes.push(slice_len); |
| current_offset += slice_len; |
| } |
| SlicePlan::Indices(indices) => { |
| let count = indices.len(); |
| for rel_index in indices { |
| let absolute_index = (start + rel_index).to_usize().unwrap(); |
| mutable.extend(0, absolute_index, absolute_index + 1); |
| } |
| let length = O::usize_as(count); |
| offsets.push(current_offset); |
| sizes.push(length); |
| current_offset += length; |
| } |
| } |
| } |
| |
| let data = mutable.freeze(); |
| |
| Ok(Arc::new(GenericListViewArray::<O>::try_new( |
| field, |
| ScalarBuffer::from(offsets), |
| ScalarBuffer::from(sizes), |
| arrow::array::make_array(data), |
| null_builder.finish(), |
| )?)) |
| } |
| |
| #[user_doc( |
| doc_section(label = "Array Functions"), |
| description = "Returns the array without the first element.", |
| syntax_example = "array_pop_front(array)", |
| sql_example = r#"```sql |
| > select array_pop_front([1, 2, 3]); |
| +-------------------------------+ |
| | array_pop_front(List([1,2,3])) | |
| +-------------------------------+ |
| | [2, 3] | |
| +-------------------------------+ |
| ```"#, |
| argument( |
| name = "array", |
| description = "Array expression. Can be a constant, column, or function, and any combination of array operators." |
| ) |
| )] |
| #[derive(Debug, PartialEq, Eq, Hash)] |
| pub(super) struct ArrayPopFront { |
| signature: Signature, |
| aliases: Vec<String>, |
| } |
| |
| impl ArrayPopFront { |
| pub fn new() -> Self { |
| Self { |
| signature: Signature::array(Volatility::Immutable), |
| aliases: vec![String::from("list_pop_front")], |
| } |
| } |
| } |
| |
| impl ScalarUDFImpl for ArrayPopFront { |
| fn name(&self) -> &str { |
| "array_pop_front" |
| } |
| |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| |
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { |
| Ok(arg_types[0].clone()) |
| } |
| |
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { |
| make_scalar_function(array_pop_front_inner)(&args.args) |
| } |
| |
| fn aliases(&self) -> &[String] { |
| &self.aliases |
| } |
| |
| fn documentation(&self) -> Option<&Documentation> { |
| self.doc() |
| } |
| } |
| |
| /// array_pop_front SQL function |
| fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> { |
| let array_data_type = args[0].data_type(); |
| match array_data_type { |
| List(_) => { |
| let array = as_list_array(&args[0])?; |
| general_pop_front_list::<i32>(array) |
| } |
| LargeList(_) => { |
| let array = as_large_list_array(&args[0])?; |
| general_pop_front_list::<i64>(array) |
| } |
| _ => exec_err!("array_pop_front does not support type: {}", array_data_type), |
| } |
| } |
| |
| fn general_pop_front_list<O: OffsetSizeTrait>( |
| array: &GenericListArray<O>, |
| ) -> Result<ArrayRef> |
| where |
| i64: TryInto<O>, |
| { |
| let from_array = Int64Array::from(vec![2; array.len()]); |
| let to_array = Int64Array::from( |
| array |
| .iter() |
| .map(|arr| arr.map_or(0, |arr| arr.len() as i64)) |
| .collect::<Vec<i64>>(), |
| ); |
| general_array_slice::<O>(array, &from_array, &to_array, None) |
| } |
| |
| #[user_doc( |
| doc_section(label = "Array Functions"), |
| description = "Returns the array without the last element.", |
| syntax_example = "array_pop_back(array)", |
| sql_example = r#"```sql |
| > select array_pop_back([1, 2, 3]); |
| +-------------------------------+ |
| | array_pop_back(List([1,2,3])) | |
| +-------------------------------+ |
| | [1, 2] | |
| +-------------------------------+ |
| ```"#, |
| argument( |
| name = "array", |
| description = "Array expression. Can be a constant, column, or function, and any combination of array operators." |
| ) |
| )] |
| #[derive(Debug, PartialEq, Eq, Hash)] |
| pub(super) struct ArrayPopBack { |
| signature: Signature, |
| aliases: Vec<String>, |
| } |
| |
| impl ArrayPopBack { |
| pub fn new() -> Self { |
| Self { |
| signature: Signature::array(Volatility::Immutable), |
| aliases: vec![String::from("list_pop_back")], |
| } |
| } |
| } |
| |
| impl ScalarUDFImpl for ArrayPopBack { |
| fn name(&self) -> &str { |
| "array_pop_back" |
| } |
| |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| |
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { |
| Ok(arg_types[0].clone()) |
| } |
| |
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { |
| make_scalar_function(array_pop_back_inner)(&args.args) |
| } |
| |
| fn aliases(&self) -> &[String] { |
| &self.aliases |
| } |
| |
| fn documentation(&self) -> Option<&Documentation> { |
| self.doc() |
| } |
| } |
| |
| /// array_pop_back SQL function |
| fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> { |
| let [array] = take_function_args("array_pop_back", args)?; |
| |
| match array.data_type() { |
| List(_) => { |
| let array = as_list_array(&array)?; |
| general_pop_back_list::<i32>(array) |
| } |
| LargeList(_) => { |
| let array = as_large_list_array(&array)?; |
| general_pop_back_list::<i64>(array) |
| } |
| _ => exec_err!( |
| "array_pop_back does not support type: {}", |
| array.data_type() |
| ), |
| } |
| } |
| |
| fn general_pop_back_list<O: OffsetSizeTrait>( |
| array: &GenericListArray<O>, |
| ) -> Result<ArrayRef> |
| where |
| i64: TryInto<O>, |
| { |
| let from_array = Int64Array::from(vec![1; array.len()]); |
| let to_array = Int64Array::from( |
| array |
| .iter() |
| .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1)) |
| .collect::<Vec<i64>>(), |
| ); |
| general_array_slice::<O>(array, &from_array, &to_array, None) |
| } |
| |
| #[user_doc( |
| doc_section(label = "Array Functions"), |
| description = "Returns the first non-null element in the array.", |
| syntax_example = "array_any_value(array)", |
| sql_example = r#"```sql |
| > select array_any_value([NULL, 1, 2, 3]); |
| +-------------------------------+ |
| | array_any_value(List([NULL,1,2,3])) | |
| +-------------------------------------+ |
| | 1 | |
| +-------------------------------------+ |
| ```"#, |
| argument( |
| name = "array", |
| description = "Array expression. Can be a constant, column, or function, and any combination of array operators." |
| ) |
| )] |
| #[derive(Debug, PartialEq, Eq, Hash)] |
| pub(super) struct ArrayAnyValue { |
| signature: Signature, |
| aliases: Vec<String>, |
| } |
| |
| impl ArrayAnyValue { |
| pub fn new() -> Self { |
| Self { |
| signature: Signature::array(Volatility::Immutable), |
| aliases: vec![String::from("list_any_value")], |
| } |
| } |
| } |
| |
| impl ScalarUDFImpl for ArrayAnyValue { |
| fn name(&self) -> &str { |
| "array_any_value" |
| } |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { |
| match &arg_types[0] { |
| List(field) | LargeList(field) | FixedSizeList(field, _) => { |
| Ok(field.data_type().clone()) |
| } |
| _ => plan_err!( |
| "array_any_value can only accept List, LargeList or FixedSizeList as the argument" |
| ), |
| } |
| } |
| |
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { |
| make_scalar_function(array_any_value_inner)(&args.args) |
| } |
| |
| fn aliases(&self) -> &[String] { |
| &self.aliases |
| } |
| |
| fn documentation(&self) -> Option<&Documentation> { |
| self.doc() |
| } |
| } |
| |
| fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> { |
| let [array] = take_function_args("array_any_value", args)?; |
| |
| match &array.data_type() { |
| List(_) => { |
| let array = as_list_array(&array)?; |
| general_array_any_value::<i32>(array) |
| } |
| LargeList(_) => { |
| let array = as_large_list_array(&array)?; |
| general_array_any_value::<i64>(array) |
| } |
| data_type => exec_err!("array_any_value does not support type: {data_type}"), |
| } |
| } |
| |
| fn general_array_any_value<O: OffsetSizeTrait>( |
| array: &GenericListArray<O>, |
| ) -> Result<ArrayRef> |
| where |
| i64: TryInto<O>, |
| { |
| let values = array.values(); |
| let original_data = values.to_data(); |
| let capacity = Capacities::Array(array.len()); |
| |
| let mut mutable = |
| MutableArrayData::with_capacities(vec![&original_data], true, capacity); |
| |
| for (row_index, offset_window) in array.offsets().windows(2).enumerate() { |
| let start = offset_window[0]; |
| let end = offset_window[1]; |
| let len = end - start; |
| |
| // array is null |
| if len == O::usize_as(0) { |
| mutable.extend_nulls(1); |
| continue; |
| } |
| |
| let row_value = array.value(row_index); |
| match row_value.nulls() { |
| Some(row_nulls_buffer) => { |
| // nulls are present in the array so try to take the first valid element |
| if let Some(first_non_null_index) = |
| row_nulls_buffer.valid_indices().next() |
| { |
| let index = start.as_usize() + first_non_null_index; |
| mutable.extend(0, index, index + 1) |
| } else { |
| // all the elements in the array are null |
| mutable.extend_nulls(1); |
| } |
| } |
| None => { |
| // no nulls are present in the array so take the first element |
| let index = start.as_usize(); |
| mutable.extend(0, index, index + 1); |
| } |
| } |
| } |
| |
| let data = mutable.freeze(); |
| Ok(arrow::array::make_array(data)) |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::{array_element_udf, general_list_view_array_slice}; |
| use arrow::array::{ |
| Array, ArrayRef, GenericListViewArray, Int32Array, Int64Array, ListViewArray, |
| cast::AsArray, |
| }; |
| use arrow::buffer::ScalarBuffer; |
| use arrow::datatypes::{DataType, Field}; |
| use datafusion_common::{Column, DFSchema, Result}; |
| use datafusion_expr::expr::ScalarFunction; |
| use datafusion_expr::{Expr, ExprSchemable}; |
| use std::collections::HashMap; |
| use std::sync::Arc; |
| |
| fn list_view_values(array: &GenericListViewArray<i32>) -> Vec<Vec<i32>> { |
| (0..array.len()) |
| .map(|i| { |
| let child = array.value(i); |
| let values = child.as_any().downcast_ref::<Int32Array>().unwrap(); |
| values.iter().map(|v| v.unwrap()).collect() |
| }) |
| .collect() |
| } |
| |
| // Regression test for https://github.com/apache/datafusion/issues/13755 |
| #[test] |
| fn test_array_element_return_type_fixed_size_list() { |
| let fixed_size_list_type = DataType::FixedSizeList( |
| Field::new("some_arbitrary_test_field", DataType::Int32, false).into(), |
| 13, |
| ); |
| let array_type = DataType::List( |
| Field::new_list_field(fixed_size_list_type.clone(), true).into(), |
| ); |
| let index_type = DataType::Int64; |
| |
| let schema = DFSchema::from_unqualified_fields( |
| vec![ |
| Field::new("my_array", array_type.clone(), false), |
| Field::new("my_index", index_type.clone(), false), |
| ] |
| .into(), |
| HashMap::default(), |
| ) |
| .unwrap(); |
| |
| let udf = array_element_udf(); |
| |
| // ScalarUDFImpl::return_type |
| assert_eq!( |
| udf.return_type(&[array_type.clone(), index_type.clone()]) |
| .unwrap(), |
| fixed_size_list_type |
| ); |
| |
| // Via ExprSchemable::get_type (e.g. SimplifyInfo) |
| let udf_expr = Expr::ScalarFunction(ScalarFunction { |
| func: array_element_udf(), |
| args: vec![ |
| Expr::Column(Column::new_unqualified("my_array")), |
| Expr::Column(Column::new_unqualified("my_index")), |
| ], |
| }); |
| assert_eq!( |
| ExprSchemable::get_type(&udf_expr, &schema).unwrap(), |
| fixed_size_list_type |
| ); |
| } |
| |
| #[test] |
| fn test_array_slice_list_view_basic() -> Result<()> { |
| let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); |
| let offsets = ScalarBuffer::from(vec![0, 3]); |
| let sizes = ScalarBuffer::from(vec![3, 2]); |
| let field = Arc::new(Field::new("item", DataType::Int32, true)); |
| let array = ListViewArray::new(field, offsets, sizes, values, None); |
| |
| let from = Int64Array::from(vec![2, 1]); |
| let to = Int64Array::from(vec![3, 2]); |
| |
| let result = general_list_view_array_slice::<i32>( |
| &array, |
| &from, |
| &to, |
| None::<&Int64Array>, |
| )?; |
| let result = result.as_ref().as_list_view::<i32>(); |
| |
| assert_eq!(list_view_values(result), vec![vec![2, 3], vec![4, 5]]); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_array_slice_list_view_non_monotonic_offsets() -> Result<()> { |
| // First list references the tail of the values buffer, second list references the head. |
| let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); |
| let offsets = ScalarBuffer::from(vec![3, 0]); |
| let sizes = ScalarBuffer::from(vec![2, 3]); |
| let field = Arc::new(Field::new("item", DataType::Int32, true)); |
| let array = ListViewArray::new(field, offsets, sizes, values, None); |
| |
| let from = Int64Array::from(vec![1, 1]); |
| let to = Int64Array::from(vec![2, 2]); |
| |
| let result = general_list_view_array_slice::<i32>( |
| &array, |
| &from, |
| &to, |
| None::<&Int64Array>, |
| )?; |
| let result = result.as_ref().as_list_view::<i32>(); |
| |
| assert_eq!(list_view_values(result), vec![vec![4, 5], vec![1, 2]]); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_array_slice_list_view_negative_stride() -> Result<()> { |
| let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); |
| let offsets = ScalarBuffer::from(vec![0, 3]); |
| let sizes = ScalarBuffer::from(vec![3, 2]); |
| let field = Arc::new(Field::new("item", DataType::Int32, true)); |
| let array = ListViewArray::new(field, offsets, sizes, values, None); |
| |
| let from = Int64Array::from(vec![3, 2]); |
| let to = Int64Array::from(vec![1, 1]); |
| let stride = Int64Array::from(vec![-1, -1]); |
| |
| let result = |
| general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?; |
| let result = result.as_ref().as_list_view::<i32>(); |
| |
| assert_eq!(list_view_values(result), vec![vec![3, 2, 1], vec![5, 4]]); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_array_slice_list_view_out_of_order() -> Result<()> { |
| let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); |
| let offsets = ScalarBuffer::from(vec![3, 1, 0]); |
| let sizes = ScalarBuffer::from(vec![2, 2, 1]); |
| let field = Arc::new(Field::new("item", DataType::Int32, true)); |
| let array = ListViewArray::new(field, offsets, sizes, values, None); |
| assert_eq!( |
| list_view_values(&array), |
| vec![vec![4, 5], vec![2, 3], vec![1]] |
| ); |
| |
| let from = Int64Array::from(vec![2, 2, 2]); |
| let to = Int64Array::from(vec![1, 1, 1]); |
| let stride = Int64Array::from(vec![-1, -1, -1]); |
| |
| let result = |
| general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?; |
| let result = result.as_ref().as_list_view::<i32>(); |
| |
| assert_eq!( |
| list_view_values(result), |
| vec![vec![5, 4], vec![3, 2], vec![]] |
| ); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_array_slice_list_view_with_nulls() -> Result<()> { |
| let values: ArrayRef = Arc::new(Int32Array::from(vec![ |
| Some(1), |
| None, |
| Some(3), |
| Some(4), |
| Some(5), |
| ])); |
| let offsets = ScalarBuffer::from(vec![0, 2, 5]); |
| let sizes = ScalarBuffer::from(vec![2, 3, 0]); |
| let field = Arc::new(Field::new("item", DataType::Int32, true)); |
| let array = ListViewArray::new(field, offsets, sizes, values, None); |
| |
| let from = Int64Array::from(vec![1, 1, 1]); |
| let to = Int64Array::from(vec![2, 2, 1]); |
| |
| let result = general_list_view_array_slice::<i32>(&array, &from, &to, None)?; |
| let result = result.as_ref().as_list_view::<i32>(); |
| |
| let actual: Vec<Vec<Option<i32>>> = (0..result.len()) |
| .map(|i| { |
| result |
| .value(i) |
| .as_any() |
| .downcast_ref::<Int32Array>() |
| .unwrap() |
| .iter() |
| .collect() |
| }) |
| .collect(); |
| |
| assert_eq!( |
| actual, |
| vec![vec![Some(1), None], vec![Some(3), Some(4)], Vec::new(),] |
| ); |
| |
| // Test with NULL stride - should return NULL for rows with NULL stride |
| let stride_with_null = Int64Array::from(vec![Some(1), None, Some(1)]); |
| let result = general_list_view_array_slice::<i32>( |
| &array, |
| &from, |
| &to, |
| Some(&stride_with_null), |
| )?; |
| let result = result.as_ref().as_list_view::<i32>(); |
| |
| // First row: stride = 1, should return [1, None] |
| // Second row: stride = NULL, should return NULL |
| // Third row: stride = 1, empty array should return empty |
| assert!(!result.is_null(0)); // First row should not be null |
| assert!(result.is_null(1)); // Second row should be null (stride is NULL) |
| assert!(!result.is_null(2)); // Third row should not be null |
| |
| let first_row: Vec<Option<i32>> = result |
| .value(0) |
| .as_any() |
| .downcast_ref::<Int32Array>() |
| .unwrap() |
| .iter() |
| .collect(); |
| assert_eq!(first_row, vec![Some(1), None]); |
| |
| Ok(()) |
| } |
| } |