[CALCITE-6223] Add MAP_CONTAINS_KEY function (enabled in Spark library)
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index 71c65fb..220d33f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -213,6 +213,7 @@
 import static org.apache.calcite.sql.fun.SqlLibraryOperators.LPAD;
 import static org.apache.calcite.sql.fun.SqlLibraryOperators.MAP;
 import static org.apache.calcite.sql.fun.SqlLibraryOperators.MAP_CONCAT;
+import static org.apache.calcite.sql.fun.SqlLibraryOperators.MAP_CONTAINS_KEY;
 import static org.apache.calcite.sql.fun.SqlLibraryOperators.MAP_ENTRIES;
 import static org.apache.calcite.sql.fun.SqlLibraryOperators.MAP_FROM_ARRAYS;
 import static org.apache.calcite.sql.fun.SqlLibraryOperators.MAP_FROM_ENTRIES;
@@ -844,6 +845,7 @@
       defineMethod(ARRAYS_ZIP, BuiltInMethod.ARRAYS_ZIP.method, NullPolicy.ANY);
       defineMethod(EXISTS, BuiltInMethod.EXISTS.method, NullPolicy.ANY);
       defineMethod(MAP_CONCAT, BuiltInMethod.MAP_CONCAT.method, NullPolicy.ANY);
+      defineMethod(MAP_CONTAINS_KEY, BuiltInMethod.MAP_CONTAINS_KEY.method, NullPolicy.ANY);
       defineMethod(MAP_ENTRIES, BuiltInMethod.MAP_ENTRIES.method, NullPolicy.STRICT);
       defineMethod(MAP_KEYS, BuiltInMethod.MAP_KEYS.method, NullPolicy.STRICT);
       defineMethod(MAP_VALUES, BuiltInMethod.MAP_VALUES.method, NullPolicy.STRICT);
diff --git a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
index 99bfcfb..6e9a294 100644
--- a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
+++ b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
@@ -5446,6 +5446,11 @@
     return new ArrayList<>(map.values());
   }
 
+  /** Support the MAP_CONTAINS_KEY function. */
+  public static Boolean mapContainsKey(Map map, Object key) {
+    return map.containsKey(key);
+  }
+
   /** Support the MAP_FROM_ARRAYS function. */
   public static Map mapFromArrays(List keysArray, List valuesArray) {
     if (keysArray.size() != valuesArray.size()) {
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlKind.java b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
index 5375ee6..ae44f49 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlKind.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
@@ -779,6 +779,9 @@
   /** {@code MAP_VALUES} function (Spark semantics). */
   MAP_VALUES,
 
+  /** {@code MAP_CONTAINS_KEY} function (Spark semantics). */
+  MAP_CONTAINS_KEY,
+
   /** {@code MAP_FROM_ARRAYS} function (Spark semantics). */
   MAP_FROM_ARRAYS,
 
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlLibraryOperators.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlLibraryOperators.java
index 093a25b..d397f00 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlLibraryOperators.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlLibraryOperators.java
@@ -1468,6 +1468,13 @@
           ReturnTypes.TO_MAP_VALUES_NULLABLE,
           OperandTypes.MAP);
 
+  /** The "MAP_CONTAINS_KEY(map, key)" function. */
+  @LibraryOperator(libraries = {SPARK})
+  public static final SqlFunction MAP_CONTAINS_KEY =
+      SqlBasicFunction.create(SqlKind.MAP_CONTAINS_KEY,
+          ReturnTypes.BOOLEAN_NULLABLE,
+          OperandTypes.MAP_KEY);
+
   private static RelDataType deriveTypeMapFromArrays(SqlOperatorBinding opBinding) {
     final RelDataType keysArrayType = opBinding.getOperandType(0);
     final RelDataType valuesArrayType = opBinding.getOperandType(1);
diff --git a/core/src/main/java/org/apache/calcite/sql/type/NonNullableAccessors.java b/core/src/main/java/org/apache/calcite/sql/type/NonNullableAccessors.java
index 48aad16..63c05db 100644
--- a/core/src/main/java/org/apache/calcite/sql/type/NonNullableAccessors.java
+++ b/core/src/main/java/org/apache/calcite/sql/type/NonNullableAccessors.java
@@ -52,4 +52,10 @@
     return requireNonNull(type.getComponentType(),
         () -> "componentType is null for " + type);
   }
+
+  @API(since = "1.37", status = API.Status.EXPERIMENTAL)
+  public static RelDataType getKeyTypeOrThrow(RelDataType type) {
+    return requireNonNull(type.getKeyType(),
+        () -> "keyType is null for " + type);
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java b/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
index 0c68f0e..5356f52 100644
--- a/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
+++ b/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
@@ -56,6 +56,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import static org.apache.calcite.sql.type.NonNullableAccessors.getKeyTypeOrThrow;
 import static org.apache.calcite.util.Static.RESOURCE;
 
 import static java.util.Objects.requireNonNull;
@@ -603,6 +604,9 @@
   public static final SqlSingleOperandTypeChecker MAP_FUNCTION =
       new MapFunctionOperandTypeChecker();
 
+  public static final SqlOperandTypeChecker MAP_KEY =
+      new MapKeyOperandTypeChecker();
+
   /**
    * Operand type-checking strategy where type must be a literal or NULL.
    */
@@ -1440,6 +1444,47 @@
     }
   }
 
+  /**
+   * Parameter type-checking strategy where types must be Map and Map key type.
+   */
+  private static class MapKeyOperandTypeChecker extends SameOperandTypeChecker {
+    MapKeyOperandTypeChecker() {
+      super(2);
+    }
+
+    @Override public boolean checkOperandTypes(
+        SqlCallBinding callBinding,
+        boolean throwOnFailure) {
+      final SqlNode op0 = callBinding.operand(0);
+      if (!OperandTypes.MAP.checkSingleOperandType(
+          callBinding,
+          op0,
+          0,
+          throwOnFailure)) {
+        return false;
+      }
+
+      final RelDataType mapKeyType =
+          getKeyTypeOrThrow(SqlTypeUtil.deriveType(callBinding, op0));
+      final SqlNode op1 = callBinding.operand(1);
+      RelDataType opType1 = SqlTypeUtil.deriveType(callBinding, op1);
+
+      RelDataType biggest =
+          callBinding.getTypeFactory().leastRestrictive(
+              ImmutableList.of(mapKeyType, opType1));
+      if (biggest == null) {
+        if (throwOnFailure) {
+          throw callBinding.newError(
+              RESOURCE.typeNotComparable(
+                  mapKeyType.toString(), opType1.toString()));
+        }
+
+        return false;
+      }
+      return true;
+    }
+  }
+
   /** Checker that passes if the operand's type has a particular
    * {@link SqlTypeName}. */
   private static class TypeNameChecker implements SqlSingleOperandTypeChecker,
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 3f56f3f..4ac9596 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -778,6 +778,7 @@
   SORT_ARRAY(SqlFunctions.class, "sortArray", List.class, boolean.class),
   MAP(SqlFunctions.class, "map", Object[].class),
   MAP_CONCAT(SqlFunctions.class, "mapConcat", Map[].class),
+  MAP_CONTAINS_KEY(SqlFunctions.class, "mapContainsKey", Map.class, Object.class),
   MAP_ENTRIES(SqlFunctions.class, "mapEntries", Map.class),
   MAP_KEYS(SqlFunctions.class, "mapKeys", Map.class),
   MAP_VALUES(SqlFunctions.class, "mapValues", Map.class),
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index bf604b6..6b9ba28 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -2794,6 +2794,7 @@
 | s | MAP()                                          | Returns an empty map
 | s | MAP(key, value [, key, value]*)                | Returns a map with the given *key*/*value* pairs
 | s | MAP_CONCAT(map [, map]*)                       | Concatenates one or more maps. If any input argument is `NULL` the function returns `NULL`. Note that calcite is using the LAST_WIN strategy
+| s | MAP_CONTAINS_KEY(map, key)                     | Returns whether *map* contains *key*
 | s | MAP_ENTRIES(map)                               | Returns the entries of the *map* as an array, the order of the entries is not defined
 | s | MAP_KEYS(map)                                  | Returns the keys of the *map* as an array, the order of the entries is not defined
 | s | MAP_VALUES(map)                                | Returns the values of the *map* as an array, the order of the entries is not defined
diff --git a/testkit/src/main/java/org/apache/calcite/test/SqlOperatorTest.java b/testkit/src/main/java/org/apache/calcite/test/SqlOperatorTest.java
index d4ee5b7..9f3898c 100644
--- a/testkit/src/main/java/org/apache/calcite/test/SqlOperatorTest.java
+++ b/testkit/src/main/java/org/apache/calcite/test/SqlOperatorTest.java
@@ -7216,6 +7216,43 @@
         "INTEGER ARRAY NOT NULL");
   }
 
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-6223">[CALCITE-6223]
+   * Add MAP_CONTAINS_KEY function (enabled in SPARK library)</a>.
+   */
+  @Test void testMapContainsKeyFunc() {
+    final SqlOperatorFixture f0 = fixture();
+    f0.setFor(SqlLibraryOperators.MAP_CONTAINS_KEY);
+    f0.checkFails("^map_contains_key(map[1, 'a'], 1)^",
+        "No match found for function signature "
+            + "MAP_CONTAINS_KEY\\(<\\(INTEGER, CHAR\\(1\\)\\) MAP\\>, <NUMERIC>\\)", false);
+
+    final SqlOperatorFixture f = f0.withLibrary(SqlLibrary.SPARK);
+    f.checkScalar("map_contains_key(map[1, 'a', 2, 'b'], 1)", "true",
+        "BOOLEAN NOT NULL");
+    f.checkScalar("map_contains_key(map[1, 'a'], 1)", "true",
+        "BOOLEAN NOT NULL");
+    f.checkScalar("map_contains_key(map[1, 'a'], 2)", "false",
+        "BOOLEAN NOT NULL");
+    f.checkScalar("map_contains_key(map['foo', 1], 'foo')", "true",
+        "BOOLEAN NOT NULL");
+    f.checkScalar("map_contains_key(map['foo', 1], 'bar')", "false",
+        "BOOLEAN NOT NULL");
+    f.checkScalar("map_contains_key(map(cast(1 as double), 2), cast(1 as double))", "true",
+        "BOOLEAN NOT NULL");
+    f.checkScalar("map_contains_key(map(array(1), array(2)), array(1))", "true",
+        "BOOLEAN NOT NULL");
+    f.checkType("map_contains_key(cast(null as map<int, varchar>), 1)", "BOOLEAN");
+    f.checkNull("map_contains_key(map[1, 'a'], cast(null as integer))");
+    f.checkNull("map_contains_key(cast(null as map<int, varchar>), cast(null as integer))");
+    f.checkFails("^map_contains_key(map['foo', 1], 1)^",
+        "CHAR\\(3\\) is not comparable to INTEGER",
+        false);
+    f.checkFails("^map_contains_key(map[1, 1], 'foo')^",
+        "INTEGER is not comparable to CHAR\\(3\\)",
+        false);
+  }
+
   /** Tests {@code MAP_FROM_ARRAYS} function from Spark. */
   @Test void testMapFromArraysFunc() {
     final SqlOperatorFixture f0 = fixture();