feat: Add support to lookup map by key (#1898)
* feat: support lookup map by key
* fmt
* clippy
* Update native/core/src/execution/planner.rs
Co-authored-by: Andy Grove <agrove@apache.org>
---------
Co-authored-by: Andy Grove <agrove@apache.org>
diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs
index 8f725e6..2e2764a 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -572,7 +572,21 @@
fail_on_error,
)))
}
- ExprStruct::ScalarFunc(expr) => self.create_scalar_function_expr(expr, input_schema),
+ ExprStruct::ScalarFunc(expr) => {
+ let func = self.create_scalar_function_expr(expr, input_schema);
+ match expr.func.as_ref() {
+ // DataFusion map_extract returns array of struct entries even if lookup by key
+ // Apache Spark wants a single value, so wrap the result into additional list extraction
+ "map_extract" => Ok(Arc::new(ListExtract::new(
+ func?,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
+ None,
+ true,
+ false,
+ ))),
+ _ => func,
+ }
+ }
ExprStruct::EqNullSafe(expr) => {
let left =
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 6b8740d..229ff03 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1970,6 +1970,10 @@
case mv: MapValues =>
val childExpr = exprToProtoInternal(mv.child, inputs, binding)
scalarFunctionExprToProto("map_values", childExpr)
+ case gmv: GetMapValue =>
+ val mapExpr = exprToProtoInternal(gmv.child, inputs, binding)
+ val keyExpr = exprToProtoInternal(gmv.key, inputs, binding)
+ scalarFunctionExprToProto("map_extract", mapExpr, keyExpr)
case _ =>
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
None
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
index 5dbcdfe..f33da3b 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
@@ -394,4 +394,46 @@
"select * from tbl",
readSchema = Some(readSchema))
}
+
+ test("native reader - extract map by key") {
+ // existing key
+ testSingleLineQuery(
+ """
+ | select map(str0, str1) c0 from
+ | (
+ | select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
+ | )
+ |""".stripMargin,
+ "select c0['key0'] from tbl")
+
+ // existing key, existing struct subfield
+ testSingleLineQuery(
+ """
+ | select map(str0, str1) c0 from
+ | (
+ | select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
+ | )
+ |""".stripMargin,
+ "select c0['key0'].b from tbl")
+
+ // nonexisting key
+ testSingleLineQuery(
+ """
+ | select map(str0, str1) c0 from
+ | (
+ | select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
+ | )
+ |""".stripMargin,
+ "select c0['key1'] from tbl")
+
+ // nonexisting key, existing struct subfield
+ testSingleLineQuery(
+ """
+ | select map(str0, str1) c0 from
+ | (
+ | select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
+ | )
+ |""".stripMargin,
+ "select c0['key1'].b from tbl")
+ }
}