Support Arrays for the Map scalar functions (#11712)

* crude impl to support array

* ++improvement

* uncomment logic test

* working impl

* leverage return_type_from_exprs

* add documentation

* remove unwrap method

* add more slt tests

* typos

* typos

* remove extract based on dt

* few more tests

* move back to return_type

* improve error & tests

* Update datafusion/functions-nested/src/map.rs

Co-authored-by: Alex Huang <huangweijun1001@gmail.com>

---------

Co-authored-by: Alex Huang <huangweijun1001@gmail.com>
diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs
index 12e306f..bf506c0 100644
--- a/datafusion/common/src/utils/mod.rs
+++ b/datafusion/common/src/utils/mod.rs
@@ -29,8 +29,10 @@
 use arrow::compute::{partition, SortColumn, SortOptions};
 use arrow::datatypes::{Field, SchemaRef, UInt32Type};
 use arrow::record_batch::RecordBatch;
+use arrow_array::cast::AsArray;
 use arrow_array::{
-    Array, FixedSizeListArray, LargeListArray, ListArray, RecordBatchOptions,
+    Array, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait,
+    RecordBatchOptions,
 };
 use arrow_schema::DataType;
 use sqlparser::ast::Ident;
@@ -440,6 +442,11 @@
     ))
 }
 
+/// Helper function to convert a ListArray into a vector of ArrayRefs.
+pub fn list_to_arrays<O: OffsetSizeTrait>(a: ArrayRef) -> Vec<ArrayRef> {
+    a.as_list::<O>().iter().flatten().collect::<Vec<_>>()
+}
+
 /// Get the base type of a data type.
 ///
 /// Example
diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs
index e218b50..b6068fd 100644
--- a/datafusion/functions-nested/src/map.rs
+++ b/datafusion/functions-nested/src/map.rs
@@ -15,18 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::make_array::make_array;
-use arrow::array::ArrayData;
-use arrow_array::{Array, ArrayRef, MapArray, StructArray};
-use arrow_buffer::{Buffer, ToByteSlice};
-use arrow_schema::{DataType, Field, SchemaBuilder};
-use datafusion_common::{exec_err, ScalarValue};
-use datafusion_expr::expr::ScalarFunction;
-use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility};
 use std::any::Any;
 use std::collections::VecDeque;
 use std::sync::Arc;
 
+use arrow::array::ArrayData;
+use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray};
+use arrow_buffer::{Buffer, ToByteSlice};
+use arrow_schema::{DataType, Field, SchemaBuilder};
+
+use datafusion_common::{exec_err, ScalarValue};
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility};
+
+use crate::make_array::make_array;
+
 /// Returns a map created from a key list and a value list
 pub fn map(keys: Vec<Expr>, values: Vec<Expr>) -> Expr {
     let keys = make_array(keys);
@@ -56,11 +59,11 @@
         );
     }
 
+    let data_type = args[0].data_type();
     let can_evaluate_to_const = can_evaluate_to_const(args);
-
     let key = get_first_array_ref(&args[0])?;
     let value = get_first_array_ref(&args[1])?;
-    make_map_batch_internal(key, value, can_evaluate_to_const)
+    make_map_batch_internal(key, value, can_evaluate_to_const, data_type)
 }
 
 fn get_first_array_ref(
@@ -73,7 +76,7 @@
             ScalarValue::FixedSizeList(array) => Ok(array.value(0)),
             _ => exec_err!("Expected array, got {:?}", value),
         },
-        ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}", array),
+        ColumnarValue::Array(array) => Ok(array.to_owned()),
     }
 }
 
@@ -81,6 +84,7 @@
     keys: ArrayRef,
     values: ArrayRef,
     can_evaluate_to_const: bool,
+    data_type: DataType,
 ) -> datafusion_common::Result<ColumnarValue> {
     if keys.null_count() > 0 {
         return exec_err!("map key cannot be null");
@@ -90,6 +94,14 @@
         return exec_err!("map requires key and value lists to have the same length");
     }
 
+    if !can_evaluate_to_const {
+        return if let DataType::LargeList(..) = data_type {
+            make_map_array_internal::<i64>(keys, values)
+        } else {
+            make_map_array_internal::<i32>(keys, values)
+        };
+    }
+
     let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
     let value_field = Arc::new(Field::new("value", values.data_type().clone(), true));
     let mut entry_struct_buffer: VecDeque<(Arc<Field>, ArrayRef)> = VecDeque::new();
@@ -190,7 +202,6 @@
         make_map_batch(args)
     }
 }
-
 fn get_element_type(data_type: &DataType) -> datafusion_common::Result<&DataType> {
     match data_type {
         DataType::List(element) => Ok(element.data_type()),
@@ -202,3 +213,115 @@
         ),
     }
 }
+
+/// Helper function to create MapArray from array of values to support arrays for Map scalar function
+///
+/// ``` text
+/// Format of input KEYS and VALUES column
+///         keys                        values
+/// +---------------------+       +---------------------+
+/// | +-----------------+ |       | +-----------------+ |
+/// | | [k11, k12, k13] | |       | | [v11, v12, v13] | |
+/// | +-----------------+ |       | +-----------------+ |
+/// |                     |       |                     |
+/// | +-----------------+ |       | +-----------------+ |
+/// | | [k21, k22, k23] | |       | | [v21, v22, v23] | |
+/// | +-----------------+ |       | +-----------------+ |
+/// |                     |       |                     |
+/// | +-----------------+ |       | +-----------------+ |
+/// | |[k31, k32, k33]  | |       | |[v31, v32, v33]  | |
+/// | +-----------------+ |       | +-----------------+ |
+/// +---------------------+       +---------------------+
+/// ```
+/// Flattened keys and values array to user create `StructArray`,
+/// which serves as inner child for `MapArray`
+///
+/// ``` text
+/// Flattened           Flattened
+/// Keys                Values
+/// +-----------+      +-----------+
+/// | +-------+ |      | +-------+ |
+/// | |  k11  | |      | |  v11  | |
+/// | +-------+ |      | +-------+ |
+/// | +-------+ |      | +-------+ |
+/// | |  k12  | |      | |  v12  | |
+/// | +-------+ |      | +-------+ |
+/// | +-------+ |      | +-------+ |
+/// | |  k13  | |      | |  v13  | |
+/// | +-------+ |      | +-------+ |
+/// | +-------+ |      | +-------+ |
+/// | |  k21  | |      | |  v21  | |
+/// | +-------+ |      | +-------+ |
+/// | +-------+ |      | +-------+ |
+/// | |  k22  | |      | |  v22  | |
+/// | +-------+ |      | +-------+ |
+/// | +-------+ |      | +-------+ |
+/// | |  k23  | |      | |  v23  | |
+/// | +-------+ |      | +-------+ |
+/// | +-------+ |      | +-------+ |
+/// | |  k31  | |      | |  v31  | |
+/// | +-------+ |      | +-------+ |
+/// | +-------+ |      | +-------+ |
+/// | |  k32  | |      | |  v32  | |
+/// | +-------+ |      | +-------+ |
+/// | +-------+ |      | +-------+ |
+/// | |  k33  | |      | |  v33  | |
+/// | +-------+ |      | +-------+ |
+/// +-----------+      +-----------+
+/// ```text
+
+fn make_map_array_internal<O: OffsetSizeTrait>(
+    keys: ArrayRef,
+    values: ArrayRef,
+) -> datafusion_common::Result<ColumnarValue> {
+    let mut offset_buffer = vec![O::zero()];
+    let mut running_offset = O::zero();
+
+    let keys = datafusion_common::utils::list_to_arrays::<O>(keys);
+    let values = datafusion_common::utils::list_to_arrays::<O>(values);
+
+    let mut key_array_vec = vec![];
+    let mut value_array_vec = vec![];
+    for (k, v) in keys.iter().zip(values.iter()) {
+        running_offset = running_offset.add(O::usize_as(k.len()));
+        offset_buffer.push(running_offset);
+        key_array_vec.push(k.as_ref());
+        value_array_vec.push(v.as_ref());
+    }
+
+    // concatenate all the arrays
+    let flattened_keys = arrow::compute::concat(key_array_vec.as_ref())?;
+    if flattened_keys.null_count() > 0 {
+        return exec_err!("keys cannot be null");
+    }
+    let flattened_values = arrow::compute::concat(value_array_vec.as_ref())?;
+
+    let fields = vec![
+        Arc::new(Field::new("key", flattened_keys.data_type().clone(), false)),
+        Arc::new(Field::new(
+            "value",
+            flattened_values.data_type().clone(),
+            true,
+        )),
+    ];
+
+    let struct_data = ArrayData::builder(DataType::Struct(fields.into()))
+        .len(flattened_keys.len())
+        .add_child_data(flattened_keys.to_data())
+        .add_child_data(flattened_values.to_data())
+        .build()?;
+
+    let map_data = ArrayData::builder(DataType::Map(
+        Arc::new(Field::new(
+            "entries",
+            struct_data.data_type().clone(),
+            false,
+        )),
+        false,
+    ))
+    .len(keys.len())
+    .add_child_data(struct_data)
+    .add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice()))
+    .build()?;
+    Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data))))
+}
diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt
index eb350c2..0dc37c6 100644
--- a/datafusion/sqllogictest/test_files/map.slt
+++ b/datafusion/sqllogictest/test_files/map.slt
@@ -199,25 +199,50 @@
 
 statement ok
 create table t as values
-('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2]),
-('b', 2, 'k3', 30, ['k3'], [3]),
-('d', 4, 'k5', 50, ['k5'], [5]);
+('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2], 'POST', [[1,2,3]], ['a']),
+('b', 2, 'k3', 30, ['k3'], [3], 'PUT', [[4]], ['b']),
+('d', 4, 'k5', 50, ['k5'], [5], null, [[1,2]], ['c']);
 
-query error
+query ?
 SELECT make_map(column1, column2, column3, column4) FROM t;
-# TODO: support array value
-# ----
-# {a: 1, k1: 10}
-# {b: 2, k3: 30}
-# {d: 4, k5: 50}
+----
+{a: 1, k1: 10}
+{b: 2, k3: 30}
+{d: 4, k5: 50}
+
+query ?
+SELECT map(column5, column6) FROM t;
+----
+{k1: 1, k2: 2}
+{k3: 3}
+{k5: 5}
+
+query ?
+SELECT map(column8, column9) FROM t;
+----
+{[1, 2, 3]: a}
+{[4]: b}
+{[1, 2]: c}
 
 query error
-SELECT map(column5, column6) FROM t;
-# TODO: support array value
-# ----
-# {k1:1, k2:2}
-# {k3: 3}
-# {k5: 5}
+SELECT map(column6, column7) FROM t;
+
+query ?
+select Map {column6: column7} from t;
+----
+{[1, 2]: POST}
+{[3]: PUT}
+{[5]: }
+
+query ?
+select Map {column8: column7} from t;
+----
+{[[1, 2, 3]]: POST}
+{[[4]]: PUT}
+{[[1, 2]]: }
+
+query error
+select Map {column7: column8} from t;
 
 query ?
 SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 'PUT', 25, 'DELETE', 24) AS method_count from t;