[FLINK-16203][table] Support JSON_OBJECT

This closes #17186
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index a953d14..3b40568 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -708,6 +708,39 @@
       -- '[]'
       JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
       ```
+  - sql: JSON_OBJECT([[KEY] key VALUE value]* [ { NULL | ABSENT } ON NULL ])
+    table: jsonObject(JsonOnNull, keyValues...)
+    description: |
+      Builds a JSON object string from a list of key-value pairs.
+
+      Note that keys must be non-`NULL` string literals, while values may be arbitrary expressions.
+
+      This function returns a JSON string. The `ON NULL` behavior defines how to treat `NULL`
+      values. If omitted, `NULL ON NULL` is assumed by default.
+
+      ```
+      -- '{}'
+      JSON_OBJECT()
+
+      -- '{"K1":"V1","K2":"V2"}'
+      JSON_OBJECT('K1' VALUE 'V1', 'K2' VALUE 'V2')
+
+      -- Expressions as values
+      JSON_OBJECT('orderNo' VALUE orders.orderId)
+
+      -- ON NULL
+      JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) NULL ON NULL)   -- '{"K1":null}'
+      JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL) -- '{}'
+
+      -- '{"K1":{"K2":"V"}}'
+      JSON_OBJECT(
+        KEY 'K1'
+        VALUE JSON_OBJECT(
+          KEY 'K2'
+          VALUE 'V'
+        )
+      )
+      ```
 
 valueconstruction:
   - sql: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index abfac47..ecf8b69 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -840,6 +840,40 @@
       -- '[]'
       JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
       ```
+  - sql: JSON_OBJECT([[KEY] key VALUE value]* [ { NULL | ABSENT } ON NULL ])
+    table: jsonObject(JsonOnNull, keyValues...)
+    description: |
+      Builds a JSON object string from a list of key-value pairs.
+
+      Note that keys must be non-`NULL` string literals, while values may be arbitrary expressions.
+
+      This function returns a JSON string. The `ON NULL` behavior defines how to treat `NULL`
+      values. If omitted, `NULL ON NULL` is assumed by default.
+
+
+      ```
+      -- '{}'
+      JSON_OBJECT()
+
+      -- '{"K1":"V1","K2":"V2"}'
+      JSON_OBJECT('K1' VALUE 'V1', 'K2' VALUE 'V2')
+
+      -- Expressions as values
+      JSON_OBJECT('orderNo' VALUE orders.orderId)
+
+      -- ON NULL
+      JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) NULL ON NULL)   -- '{"K1":null}'
+      JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL) -- '{}'
+
+      -- '{"K1":{"K2":"V"}}'
+      JSON_OBJECT(
+        KEY 'K1'
+        VALUE JSON_OBJECT(
+          KEY 'K2'
+          VALUE 'V'
+        )
+      )
+      ```
 
 valueconstruction:
   - sql: |
diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py
index 93d959f..2f26233 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -447,6 +447,20 @@
         return getattr(JJsonQueryOnEmptyOrError, self.name)
 
 
+class JsonOnNull(Enum):
+    """
+    Behavior for entries with a null value for json_object().
+    """
+
+    NULL = 0,
+    ABSENT = 1
+
+    def _to_j_json_on_null(self):
+        gateway = get_gateway()
+        JJsonOnNull = gateway.jvm.org.apache.flink.table.api.JsonOnNull
+        return getattr(JJsonOnNull, self.name)
+
+
 T = TypeVar('T')
 
 
diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py
index 7f85342..4e2d91d 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -19,7 +19,7 @@
 
 from pyflink import add_version_doc
 from pyflink.java_gateway import get_gateway
-from pyflink.table.expression import Expression, _get_java_expression, TimePointUnit
+from pyflink.table.expression import Expression, _get_java_expression, TimePointUnit, JsonOnNull
 from pyflink.table.types import _to_java_data_type, DataType, _to_java_type
 from pyflink.table.udf import UserDefinedFunctionWrapper, UserDefinedTableFunctionWrapper
 from pyflink.util.java_utils import to_jarray, load_java_class
@@ -29,8 +29,8 @@
            'current_timestamp', 'current_watermark', 'local_time', 'local_timestamp',
            'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_',
            'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat',
-           'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'call',
-           'call_sql', 'source_watermark']
+           'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_object',
+           'call', 'call_sql', 'source_watermark']
 
 
 def _leaf_op(op_name: str) -> Expression:
@@ -67,6 +67,12 @@
         _get_java_expression(forth)))
 
 
+def _varargs_op(op_name: str, *args):
+    gateway = get_gateway()
+    return Expression(
+        getattr(gateway.jvm.Expressions, op_name)(*[_get_java_expression(arg) for arg in args]))
+
+
 def _add_version_doc():
     from inspect import getmembers, isfunction
     from pyflink.table import expressions
@@ -581,6 +587,33 @@
     return _binary_op("withoutColumns", head, tails)
 
 
+def json_object(on_null: JsonOnNull = JsonOnNull.NULL, *args) -> Expression:
+    """
+    Builds a JSON object string from a list of key-value pairs.
+
+    `args` is an even-numbered list of alternating key/value pairs. Note that keys must be
+    non-`NULL` string literals, while values may be arbitrary expressions.
+
+    This function returns a JSON string. The `on_null` behavior defines how to treat `NULL` values.
+
+    Examples:
+    ::
+
+        >>> json_object() # '{}'
+        >>> json_object(JsonOnNull.NULL, "K1", "V1", "K2", "V2") # '{"K1":"V1","K2":"V2"}'
+
+        >>> # Expressions as values
+        >>> json_object(JsonOnNull.NULL, "orderNo", col("orderId"))
+
+        >>> json_object(JsonOnNull.NULL, "K1", null_of(DataTypes.STRING()))   # '{"K1":null}'
+        >>> json_object(JsonOnNull.ABSENT, "K1", null_of(DataTypes.STRING())) # '{}'
+
+        >>> # '{"K1":{"K2":"V"}}'
+        >>> json_object(JsonOnNull.NULL, "K1", json_object(JsonOnNull.NULL, "K2", "V"))
+    """
+    return _varargs_op("jsonObject", *(on_null._to_j_json_on_null(), *args))
+
+
 def call(f: Union[str, UserDefinedFunctionWrapper], *args) -> Expression:
     """
     The first parameter `f` could be a str or a Python user-defined function.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 51770a7..c21bf13 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -43,6 +43,7 @@
 import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
 import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECT;
 
 /**
  * Entry point of the Table API Expression DSL such as: {@code $("myField").plus(10).abs()}
@@ -563,6 +564,42 @@
     }
 
     /**
+     * Builds a JSON object string from a list of key-value pairs.
+     *
+     * <p>{@param keyValues} is an even-numbered list of alternating key/value pairs. Note that keys
+     * must be non-{@code NULL} string literals, while values may be arbitrary expressions.
+     *
+     * <p>This function returns a JSON string. The {@link JsonOnNull onNull} behavior defines how to
+     * treat {@code NULL} values.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // {}
+     * jsonObject(JsonOnNull.NULL)
+     * // "{\"K1\":\"V1\",\"K2\":\"V2\"}"
+     * // {"K1":"V1","K2":"V2"}
+     * jsonObject(JsonOnNull.NULL, "K1", "V1", "K2", "V2")
+     *
+     * // Expressions as values
+     * jsonObject(JsonOnNull.NULL, "orderNo", $("orderId"))
+     *
+     * // ON NULL
+     * jsonObject(JsonOnNull.NULL, "K1", nullOf(DataTypes.STRING()))   // "{\"K1\":null}"
+     * jsonObject(JsonOnNull.ABSENT, "K1", nullOf(DataTypes.STRING())) // "{}"
+     *
+     * // {"K1":{"K2":"V"}}
+     * jsonObject(JsonOnNull.NULL, "K1", jsonObject(JsonOnNull.NULL, "K2", "V"))
+     * }</pre>
+     */
+    public static ApiExpression jsonObject(JsonOnNull onNull, Object... keyValues) {
+        final Object[] arguments =
+                Stream.concat(Stream.of(onNull), Arrays.stream(keyValues)).toArray(Object[]::new);
+
+        return apiCall(JSON_OBJECT, arguments);
+    }
+
+    /**
      * A call to a function that will be looked up in a catalog. There are two kinds of functions:
      *
      * <ul>
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index 2bb033c..d14009b 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -784,4 +784,35 @@
    * }}}
    */
   def not(expression: Expression): Expression = Expressions.not(expression)
+
+  /**
+   * Builds a JSON object string from a list of key-value pairs.
+   *
+   * <code>keyValues</code> is an even-numbered list of alternating key/value pairs. Note that keys
+   * must be string literals, values may be arbitrary expressions.
+   *
+   * This function returns a JSON string. The [[JsonOnNull onNull]] behavior defines how to treat
+   * <code>NULL</code> values.
+   *
+   * Examples:
+   * {{{
+   * // {}
+   * jsonObject(JsonOnNull.NULL)
+   * // {"K1":"V1","K2":"V2"}
+   * jsonObject(JsonOnNull.NULL, "K1", "V1", "K2", "V2")
+   *
+   * // Expressions as values
+   * jsonObject(JsonOnNull.NULL, "orderNo", $("orderId"))
+   *
+   * // ON NULL
+   * jsonObject(JsonOnNull.NULL, "K1", nullOf(DataTypes.STRING()))   // "{\"K1\":null}"
+   * jsonObject(JsonOnNull.ABSENT, "K1", nullOf(DataTypes.STRING())) // '{}'
+   *
+   * // {"K1":{"K2":"V"}}
+   * jsonObject(JsonOnNull.NULL, "K1", jsonObject(JsonOnNull.NULL, "K2", "V"))
+   * }}}
+   */
+  def jsonObject(onNull: JsonOnNull, keyValues: Any*): Expression = {
+    Expressions.jsonObject(onNull, keyValues)
+  }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/JsonOnNull.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/JsonOnNull.java
new file mode 100644
index 0000000..78c98bc
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/JsonOnNull.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.TableSymbol;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+/** Behavior for entries with a null value for {@link BuiltInFunctionDefinitions#JSON_OBJECT}. */
+@PublicEvolving
+public enum JsonOnNull implements TableSymbol {
+    /** Use a JSON {@code null} value. */
+    NULL,
+    /** Omit this key from the resulting JSON. */
+    ABSENT
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index a566e7e..f5964c89 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1558,6 +1558,15 @@
                     .runtimeDeferred()
                     .build();
 
+    public static final BuiltInFunctionDefinition JSON_OBJECT =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("JSON_OBJECT")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(SpecificInputTypeStrategies.JSON_OBJECT)
+                    .outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
+                    .runtimeDeferred()
+                    .build();
+
     // --------------------------------------------------------------------------------------------
     // Other functions
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java
index 41b88bd..61cb639 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java
@@ -94,10 +94,19 @@
         for (int i = 0; i < argumentStrategies.size(); i++) {
             final Signature.Argument argument =
                     argumentStrategies.get(i).getExpectedArgument(definition, i);
-            arguments.add(argument);
+
+            final Signature.Argument newArgument;
+            if (i == 0) {
+                newArgument = Signature.Argument.of(String.format("[%s", argument.getType()));
+            } else if (i == argumentStrategies.size() - 1) {
+                newArgument = Signature.Argument.of(String.format("%s]...", argument.getType()));
+            } else {
+                newArgument = argument;
+            }
+
+            arguments.add(newArgument);
         }
 
-        // Unfortunately there is no way to represent the repetition in the signature
         final Signature signature = Signature.of(arguments);
         return Collections.singletonList(signature);
     }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index d41784d..e9d62db 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -19,13 +19,23 @@
 package org.apache.flink.table.types.inference.strategies;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.JsonOnNull;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategies;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.StructuredType;
 
+import static org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.compositeSequence;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.repeatingSequence;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.symbol;
 
 /**
  * Entry point for specific input type strategies not covered in {@link InputTypeStrategies}.
@@ -46,6 +56,26 @@
     public static final InputTypeStrategy CURRENT_WATERMARK =
             new CurrentWatermarkInputTypeStrategy();
 
+    /**
+     * Input strategy for {@link BuiltInFunctionDefinitions#JSON_OBJECT}.
+     *
+     * <p>The first argument defines the on-null behavior and is followed by any number of key-value
+     * pairs. Keys must be character string literals, while values are arbitrary expressions.
+     */
+    public static final InputTypeStrategy JSON_OBJECT =
+            compositeSequence()
+                    .argument(symbol(JsonOnNull.class))
+                    .finishWithVarying(
+                            repeatingSequence(
+                                    and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
+                                    or(
+                                            logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            logical(LogicalTypeFamily.BINARY_STRING),
+                                            logical(LogicalTypeFamily.TIMESTAMP),
+                                            logical(LogicalTypeFamily.CONSTRUCTED),
+                                            logical(LogicalTypeRoot.BOOLEAN),
+                                            logical(LogicalTypeFamily.NUMERIC))));
+
     // --------------------------------------------------------------------------------------------
     // Strategies composed of other strategies
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategyTest.java
index 5941d49..9c09b79 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategyTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategyTest.java
@@ -52,13 +52,13 @@
                         .calledWithArgumentTypes(STRING(), INT())
                         .expectErrorMessage(
                                 String.format(
-                                        "Invalid input arguments. Expected signatures are:%nf(INT, STRING)")),
+                                        "Invalid input arguments. Expected signatures are:%nf([INT, STRING]...)")),
                 TestSpec.forStrategy(
                                 "Incorrect number of arguments",
                                 repeatingSequence(explicit(INT()), explicit(STRING())))
                         .calledWithArgumentTypes(INT())
                         .expectErrorMessage(
                                 String.format(
-                                        "Invalid input arguments. Expected signatures are:%nf(INT, STRING)")));
+                                        "Invalid input arguments. Expected signatures are:%nf([INT, STRING]...)")));
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java
index b36b840b..390623d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java
@@ -56,6 +56,7 @@
         CONVERTERS.put(BuiltInFunctionDefinitions.JSON_EXISTS, new JsonExistsConverter());
         CONVERTERS.put(BuiltInFunctionDefinitions.JSON_VALUE, new JsonValueConverter());
         CONVERTERS.put(BuiltInFunctionDefinitions.JSON_QUERY, new JsonQueryConverter());
+        CONVERTERS.put(BuiltInFunctionDefinitions.JSON_OBJECT, new JsonObjectConverter());
         CONVERTERS.put(InternalFunctionDefinitions.THROW_EXCEPTION, new ThrowExceptionConverter());
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonObjectConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonObjectConverter.java
new file mode 100644
index 0000000..da1c378
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonObjectConverter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter.converters;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.JsonOnNull;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/** Conversion for {@link BuiltInFunctionDefinitions#JSON_OBJECT}. */
+@Internal
+class JsonObjectConverter extends CustomizedConverter {
+    @Override
+    public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertContext context) {
+        checkArgument(call, (call.getChildren().size() - 1) % 2 == 0);
+        final List<RexNode> operands = new LinkedList<>();
+
+        final SqlJsonConstructorNullClause onNull =
+                ((ValueLiteralExpression) call.getChildren().get(0))
+                        .getValueAs(JsonOnNull.class)
+                        .map(this::convertOnNull)
+                        .orElseThrow(() -> new TableException("Missing argument for ON NULL."));
+        operands.add(context.getRelBuilder().getRexBuilder().makeFlag(onNull));
+
+        for (int i = 1; i < call.getChildren().size(); i++) {
+            final Expression operand = call.getChildren().get(i);
+            if (i % 2 == 1 && !(operand instanceof ValueLiteralExpression)) {
+                throw new TableException(
+                        String.format("Argument at position %s must be a string literal.", i));
+            }
+
+            operands.add(context.toRexNode(operand));
+        }
+
+        return context.getRelBuilder()
+                .getRexBuilder()
+                .makeCall(FlinkSqlOperatorTable.JSON_OBJECT, operands);
+    }
+
+    private SqlJsonConstructorNullClause convertOnNull(JsonOnNull onNull) {
+        switch (onNull) {
+            case NULL:
+                return SqlJsonConstructorNullClause.NULL_ON_NULL;
+            case ABSENT:
+                return SqlJsonConstructorNullClause.ABSENT_ON_NULL;
+            default:
+                throw new TableException("Unknown ON NULL behavior: " + onNull);
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index a5afcc4..9ffa130 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -1169,6 +1169,7 @@
     public static final SqlFunction JSON_EXISTS = SqlStdOperatorTable.JSON_EXISTS;
     public static final SqlFunction JSON_VALUE = SqlStdOperatorTable.JSON_VALUE;
     public static final SqlFunction JSON_QUERY = SqlStdOperatorTable.JSON_QUERY;
+    public static final SqlFunction JSON_OBJECT = new SqlJsonObjectFunction();
     public static final SqlPostfixOperator IS_JSON_VALUE = SqlStdOperatorTable.IS_JSON_VALUE;
     public static final SqlPostfixOperator IS_JSON_OBJECT = SqlStdOperatorTable.IS_JSON_OBJECT;
     public static final SqlPostfixOperator IS_JSON_ARRAY = SqlStdOperatorTable.IS_JSON_ARRAY;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonObjectFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonObjectFunction.java
new file mode 100644
index 0000000..6996d54
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonObjectFunction.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.Locale;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+/**
+ * This class has been copied from Calcite to backport the fix made during CALCITE-4394.
+ *
+ * <p>TODO Remove this class with Calcite 1.27 and replace it with {@link
+ * SqlStdOperatorTable#JSON_OBJECT}.
+ */
+public class SqlJsonObjectFunction extends SqlFunction {
+    public SqlJsonObjectFunction() {
+        super(
+                "JSON_OBJECT",
+                SqlKind.OTHER_FUNCTION,
+                ReturnTypes.VARCHAR_2000,
+                (callBinding, returnType, operandTypes) -> {
+                    RelDataTypeFactory typeFactory = callBinding.getTypeFactory();
+                    for (int i = 0; i < operandTypes.length; i++) {
+                        operandTypes[i] =
+                                i == 0
+                                        ? typeFactory.createSqlType(SqlTypeName.SYMBOL)
+                                        : i % 2 == 1
+                                                ? typeFactory.createSqlType(SqlTypeName.VARCHAR)
+                                                : typeFactory.createTypeWithNullability(
+                                                        typeFactory.createSqlType(SqlTypeName.ANY),
+                                                        true);
+                    }
+                },
+                null,
+                SqlFunctionCategory.SYSTEM);
+    }
+
+    @Override
+    public SqlOperandCountRange getOperandCountRange() {
+        return SqlOperandCountRanges.from(1);
+    }
+
+    @Override
+    protected void checkOperandCount(
+            SqlValidator validator, SqlOperandTypeChecker argType, SqlCall call) {
+        assert call.operandCount() % 2 == 1;
+    }
+
+    @Override
+    public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+        final int count = callBinding.getOperandCount();
+        for (int i = 1; i < count; i += 2) {
+            RelDataType nameType = callBinding.getOperandType(i);
+            if (!SqlTypeUtil.inCharFamily(nameType)) {
+                if (throwOnFailure) {
+                    throw callBinding.newError(RESOURCE.expectedCharacter());
+                }
+                return false;
+            }
+            if (nameType.isNullable()) {
+                if (throwOnFailure) {
+                    throw callBinding.newError(
+                            RESOURCE.argumentMustNotBeNull(callBinding.operand(i).toString()));
+                }
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+        if (operands[0] == null) {
+            operands[0] = SqlLiteral.createSymbol(SqlJsonConstructorNullClause.NULL_ON_NULL, pos);
+        }
+        return super.createCall(functionQualifier, pos, operands);
+    }
+
+    @Override
+    public String getSignatureTemplate(int operandsCount) {
+        assert operandsCount % 2 == 1;
+        StringBuilder sb = new StringBuilder();
+        sb.append("{0}(");
+        for (int i = 1; i < operandsCount; i++) {
+            sb.append(String.format(Locale.ROOT, "{%d} ", i + 1));
+        }
+        sb.append("{1})");
+        return sb.toString();
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+        assert call.operandCount() % 2 == 1;
+        final SqlWriter.Frame frame = writer.startFunCall(getName());
+        SqlWriter.Frame listFrame = writer.startList("", "");
+        for (int i = 1; i < call.operandCount(); i += 2) {
+            writer.sep(",");
+            writer.keyword("KEY");
+            call.operand(i).unparse(writer, leftPrec, rightPrec);
+            writer.keyword("VALUE");
+            call.operand(i + 1).unparse(writer, leftPrec, rightPrec);
+        }
+        writer.endList(listFrame);
+
+        SqlJsonConstructorNullClause nullClause = getEnumValue(call.operand(0));
+        switch (nullClause) {
+            case ABSENT_ON_NULL:
+                writer.keyword("ABSENT ON NULL");
+                break;
+            case NULL_ON_NULL:
+                writer.keyword("NULL ON NULL");
+                break;
+            default:
+                throw new IllegalStateException("unreachable code");
+        }
+        writer.endFunCall(frame);
+    }
+
+    private <E extends Enum<E>> E getEnumValue(SqlNode operand) {
+        return (E) ((SqlLiteral) operand).getValue();
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 4cea7d2..3594bc5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -782,6 +782,8 @@
 
       case JSON_VALUE => new JsonValueCallGen().generate(ctx, operands, resultType)
 
+      case JSON_OBJECT => new JsonObjectCallGen(call).generate(ctx, operands, resultType)
+
       case _: SqlThrowExceptionFunction =>
         val nullValue = generateNullLiteral(resultType, nullCheck = true)
         val code =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 166011f..0d9a60e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -25,6 +25,7 @@
 import org.apache.flink.table.functions.{ConstantFunctionContext, FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.FunctionCodeGenerator.generateFunction
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.JSON_OBJECT
 import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
 import org.apache.flink.table.planner.utils.Logging
 import org.apache.flink.table.types.DataType
@@ -32,7 +33,7 @@
 import org.apache.flink.table.util.TimestampStringUtils.fromLocalDateTime
 
 import org.apache.calcite.avatica.util.ByteString
-import org.apache.calcite.rex.{RexBuilder, RexExecutor, RexNode}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexExecutor, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
 
 import scala.collection.JavaConverters._
@@ -78,6 +79,10 @@
            (SqlTypeName.MAP, _) |
            (SqlTypeName.MULTISET, _) => None
 
+      // Exclude some JSON functions which behave differently when called as an argument of another
+      // call of one of these functions.
+      case (_, call: RexCall) if call.getOperator == JSON_OBJECT => None
+
       case (_, e) => Some(e)
     }
 
@@ -140,73 +145,76 @@
       if (pythonUDFExprs.exists(_ eq unreduced)) {
         // if contains python function then just insert the original expression.
         reducedValues.add(unreduced)
-      } else {
-        unreduced.getType.getSqlTypeName match {
-          // we insert the original expression for object literals
-          case SqlTypeName.ANY |
-               SqlTypeName.OTHER |
-               SqlTypeName.ROW |
-               SqlTypeName.STRUCTURED |
-               SqlTypeName.ARRAY |
-               SqlTypeName.MAP |
-               SqlTypeName.MULTISET =>
-            reducedValues.add(unreduced)
-          case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
-            val escapeVarchar = BinaryStringDataUtil.safeToString(
-              reduced.getField(reducedIdx).asInstanceOf[BinaryStringData])
-            reducedValues.add(maySkipNullLiteralReduce(rexBuilder, escapeVarchar, unreduced))
-            reducedIdx += 1
-          case SqlTypeName.VARBINARY | SqlTypeName.BINARY =>
-            val reducedValue = reduced.getField(reducedIdx)
-            val value = if (null != reducedValue) {
-              new ByteString(reduced.getField(reducedIdx).asInstanceOf[Array[Byte]])
-            } else {
-              reducedValue
-            }
-            reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
-            reducedIdx += 1
-          case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
-            val reducedValue = reduced.getField(reducedIdx)
-            val value = if (reducedValue != null) {
-              val dt = reducedValue.asInstanceOf[TimestampData].toLocalDateTime
-              fromLocalDateTime(dt)
-            } else {
-              reducedValue
-            }
-            reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
-            reducedIdx += 1
-          case SqlTypeName.DECIMAL =>
-            val reducedValue = reduced.getField(reducedIdx)
-            val value = if (reducedValue != null) {
-              reducedValue.asInstanceOf[DecimalData].toBigDecimal
-            } else {
-              reducedValue
-            }
-            reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
-            reducedIdx += 1
-          case SqlTypeName.TIMESTAMP =>
-            val reducedValue = reduced.getField(reducedIdx)
-            val value = if (reducedValue != null) {
-              val dt = reducedValue.asInstanceOf[TimestampData].toLocalDateTime
-              fromLocalDateTime(dt)
-            } else {
-              reducedValue
-            }
-            reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
-            reducedIdx += 1
-          case _ =>
-            val reducedValue = reduced.getField(reducedIdx)
-            // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
-            val value = if (reducedValue != null &&
-              unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) {
-              new java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue())
-            } else {
-              reducedValue
-            }
+      } else unreduced match {
+        case call: RexCall if call.getOperator == JSON_OBJECT =>
+          reducedValues.add(unreduced)
+        case _ =>
+          unreduced.getType.getSqlTypeName match {
+            // we insert the original expression for object literals
+            case SqlTypeName.ANY |
+                 SqlTypeName.OTHER |
+                 SqlTypeName.ROW |
+                 SqlTypeName.STRUCTURED |
+                 SqlTypeName.ARRAY |
+                 SqlTypeName.MAP |
+                 SqlTypeName.MULTISET =>
+              reducedValues.add(unreduced)
+            case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
+              val escapeVarchar = BinaryStringDataUtil.safeToString(
+                reduced.getField(reducedIdx).asInstanceOf[BinaryStringData])
+              reducedValues.add(maySkipNullLiteralReduce(rexBuilder, escapeVarchar, unreduced))
+              reducedIdx += 1
+            case SqlTypeName.VARBINARY | SqlTypeName.BINARY =>
+              val reducedValue = reduced.getField(reducedIdx)
+              val value = if (null != reducedValue) {
+                new ByteString(reduced.getField(reducedIdx).asInstanceOf[Array[Byte]])
+              } else {
+                reducedValue
+              }
+              reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+              reducedIdx += 1
+            case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
+              val reducedValue = reduced.getField(reducedIdx)
+              val value = if (reducedValue != null) {
+                val dt = reducedValue.asInstanceOf[TimestampData].toLocalDateTime
+                fromLocalDateTime(dt)
+              } else {
+                reducedValue
+              }
+              reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+              reducedIdx += 1
+            case SqlTypeName.DECIMAL =>
+              val reducedValue = reduced.getField(reducedIdx)
+              val value = if (reducedValue != null) {
+                reducedValue.asInstanceOf[DecimalData].toBigDecimal
+              } else {
+                reducedValue
+              }
+              reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+              reducedIdx += 1
+            case SqlTypeName.TIMESTAMP =>
+              val reducedValue = reduced.getField(reducedIdx)
+              val value = if (reducedValue != null) {
+                val dt = reducedValue.asInstanceOf[TimestampData].toLocalDateTime
+                fromLocalDateTime(dt)
+              } else {
+                reducedValue
+              }
+              reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+              reducedIdx += 1
+            case _ =>
+              val reducedValue = reduced.getField(reducedIdx)
+              // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
+              val value = if (reducedValue != null &&
+                unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) {
+                new java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue())
+              } else {
+                reducedValue
+              }
 
-            reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
-            reducedIdx += 1
-        }
+              reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+              reducedIdx += 1
+          }
       }
       i += 1
     }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
new file mode 100644
index 0000000..fde4b8e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, rowFieldReadAccess, typeTerm}
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.JSON_OBJECT
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isCharacterString
+import org.apache.flink.table.types.logical.LogicalTypeRoot._
+import org.apache.flink.table.types.logical.{ArrayType, LogicalType, MapType, MultisetType, RowType}
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode, ContainerNode, ObjectNode}
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue
+
+import org.apache.calcite.rex.{RexCall, RexNode}
+
+import java.time.format.DateTimeFormatter
+
+/** Utility for generating JSON function calls. */
+object JsonGenerateUtils {
+
+  /**
+   * Returns a term which wraps the given <code>valueExpr</code> into a [[JsonNode]] of the
+   * appropriate type.
+   */
+  def createNodeTerm(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      valueExpr: GeneratedExpression): String = {
+    createNodeTerm(ctx, containerNodeTerm, valueExpr.resultTerm, valueExpr.resultType)
+  }
+
+  /**
+   * Returns a term which wraps the given expression into a [[JsonNode]] of the appropriate type.
+   */
+  private def createNodeTerm(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      term: String,
+      logicalType: LogicalType): String = {
+    logicalType.getTypeRoot match {
+      case CHAR | VARCHAR => s"$containerNodeTerm.textNode($term.toString())"
+      case BOOLEAN => s"$containerNodeTerm.booleanNode($term)"
+      case DECIMAL => s"$containerNodeTerm.numberNode($term.toBigDecimal())"
+      case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE =>
+        s"$containerNodeTerm.numberNode($term)"
+      case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
+        val formatter = s"${typeTerm(classOf[DateTimeFormatter])}.ISO_LOCAL_DATE_TIME"
+        val isoTerm = s"$term.toLocalDateTime().format($formatter)"
+        logicalType.getTypeRoot match {
+          case TIMESTAMP_WITHOUT_TIME_ZONE => s"$containerNodeTerm.textNode($isoTerm)"
+          case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"""$containerNodeTerm.textNode($isoTerm + "Z")"""
+        }
+      case TIMESTAMP_WITH_TIME_ZONE =>
+        throw new CodeGenException(s"'TIMESTAMP WITH TIME ZONE' is not yet supported.")
+      case BINARY | VARBINARY =>
+        s"$containerNodeTerm.binaryNode($term)"
+      case ARRAY =>
+        val converterName = generateArrayConverter(ctx, containerNodeTerm,
+          logicalType.asInstanceOf[ArrayType].getElementType)
+
+        s"$converterName($term)"
+      case ROW =>
+        val converterName = generateRowConverter(ctx, containerNodeTerm,
+          logicalType.asInstanceOf[RowType])
+
+        s"$converterName($term)"
+      case MAP =>
+        val mapType = logicalType.asInstanceOf[MapType]
+        val converterName = generateMapConverter(ctx, containerNodeTerm, mapType.getKeyType,
+          mapType.getValueType)
+
+        s"$converterName($term)"
+      case MULTISET =>
+        val converterName = generateMapConverter(ctx, containerNodeTerm,
+          logicalType.asInstanceOf[MultisetType].getElementType, DataTypes.INT().getLogicalType)
+
+        s"$converterName($term)"
+      case _ => throw new CodeGenException(
+        s"Type '$logicalType' is not scalar or cannot be converted into JSON.")
+    }
+  }
+
+  /**
+   * Returns a term which wraps the given <code>valueExpr</code> as a raw [[JsonNode]].
+   *
+   * @param containerNodeTerm Name of the [[ContainerNode]] from which to create the raw node.
+   * @param valueExpr Generated expression of the value which should be wrapped.
+   * @return Generate code fragment creating the raw node.
+   */
+  def createRawNodeTerm(containerNodeTerm: String, valueExpr: GeneratedExpression): String = {
+    s"""
+       |$containerNodeTerm.rawValueNode(
+       |    new ${typeTerm(classOf[RawValue])}(${valueExpr.resultTerm}.toString()))
+       |""".stripMargin
+  }
+
+  /**
+   * Determines whether the given operand is a call to a JSON function whose result should be
+   * inserted as a raw value instead of as a character string.
+   */
+  def isJsonFunctionOperand(operand: RexNode): Boolean = {
+    operand match {
+      case rexCall: RexCall => rexCall.getOperator match {
+        case JSON_OBJECT => true
+        case _ => false
+      }
+      case _ => false
+    }
+  }
+
+  /** Generates a method to convert arrays into [[ArrayNode]]. */
+  private def generateArrayConverter(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      elementType: LogicalType): String = {
+    val fieldAccessCode = toExternalTypeTerm(
+      rowFieldReadAccess(ctx, "i", "arrData", elementType), elementType)
+
+    val methodName = newName("convertArray")
+    val methodCode =
+      s"""
+         |private ${className[ArrayNode]} $methodName(${CodeGenUtils.ARRAY_DATA} arrData) {
+         |    ${className[ArrayNode]} arrNode = $containerNodeTerm.arrayNode();
+         |    for (int i = 0; i < arrData.size(); i++) {
+         |        arrNode.add(
+         |            ${createNodeTerm(ctx, containerNodeTerm, fieldAccessCode, elementType)});
+         |    }
+         |
+         |    return arrNode;
+         |}
+         |""".stripMargin
+
+    ctx.addReusableMember(methodCode)
+    methodName
+  }
+
+  /** Generates a method to convert rows into [[ObjectNode]]. */
+  private def generateRowConverter(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      rowType: RowType): String = {
+
+    val populateObjectCode = toScala(rowType.getFieldNames).zipWithIndex.map {
+      case (fieldName, idx) =>
+        val fieldType = rowType.getTypeAt(idx)
+        val fieldAccessCode = toExternalTypeTerm(
+          rowFieldReadAccess(ctx, idx.toString, "rowData", fieldType), fieldType)
+
+        s"""
+           |objNode.set("$fieldName",
+           |    ${createNodeTerm(ctx, containerNodeTerm, fieldAccessCode, fieldType)});
+           |""".stripMargin
+    }.mkString
+
+    val methodName = newName("convertRow")
+    val methodCode =
+      s"""
+         |private ${className[ObjectNode]} $methodName(${CodeGenUtils.ROW_DATA} rowData) {
+         |    ${className[ObjectNode]} objNode = $containerNodeTerm.objectNode();
+         |    $populateObjectCode
+         |
+         |    return objNode;
+         |}
+         |""".stripMargin
+
+    ctx.addReusableMember(methodCode)
+    methodName
+  }
+
+  /** Generates a method to convert maps into [[ObjectNode]]. */
+  private def generateMapConverter(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      keyType: LogicalType,
+      valueType: LogicalType): String = {
+    if (!isCharacterString(keyType)) {
+      throw new CodeGenException(
+        s"Type '$keyType' is not supported for JSON conversion. "
+          + "The key type must be a character string.")
+    }
+
+    val keyAccessCode = toExternalTypeTerm(
+      rowFieldReadAccess(ctx, "i", "mapData.keyArray()", keyType), keyType)
+    val valueAccessCode = toExternalTypeTerm(
+      rowFieldReadAccess(ctx, "i", "mapData.valueArray()", valueType), valueType)
+
+    val methodName = newName("convertMap")
+    val methodCode =
+      s"""
+         |private ${className[ObjectNode]} $methodName(${CodeGenUtils.MAP_DATA} mapData) {
+         |    ${className[ObjectNode]} objNode = $containerNodeTerm.objectNode();
+         |    for (int i = 0; i < mapData.size(); i++) {
+         |        java.lang.String key = $keyAccessCode;
+         |        if (key == null) {
+         |            throw new java.lang.IllegalArgumentException("Key at index " + i
+         |                + " was null. This is not supported during conversion to JSON.");
+         |        }
+         |
+         |        objNode.set(key,
+         |            ${createNodeTerm(ctx, containerNodeTerm, valueAccessCode, valueType)});
+         |    }
+         |
+         |    return objNode;
+         |}
+         |""".stripMargin
+
+    ctx.addReusableMember(methodCode)
+    methodName
+  }
+
+  private def toExternalTypeTerm(term: String, logicalType: LogicalType): String = {
+    if (isCharacterString(logicalType)) {
+      s"$term.toString()"
+    } else {
+      term
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 7b41553..82387c9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -24,8 +24,8 @@
 import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
 import org.apache.calcite.linq4j.tree.Types
 import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions}
-import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior,
-  SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
+import org.apache.calcite.sql.{SqlJsonConstructorNullClause, SqlJsonExistsErrorBehavior,
+  SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
 
 import java.lang.reflect.Method
 import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => JShort}
@@ -513,4 +513,7 @@
   val JSON_QUERY = Types.lookupMethod(classOf[JsonFunctions], "jsonQuery",
     classOf[String], classOf[String], classOf[SqlJsonQueryWrapperBehavior],
     classOf[SqlJsonQueryEmptyOrErrorBehavior], classOf[SqlJsonQueryEmptyOrErrorBehavior])
+
+  val JSON_OBJECT = Types.lookupMethod(classOf[JsonFunctions], "jsonObject",
+    classOf[SqlJsonConstructorNullClause], classOf[Array[Any]])
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
new file mode 100644
index 0000000..ce09e7b
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import org.apache.flink.table.planner.codegen.CodeGenUtils._
+import org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createRawNodeTerm, createNodeTerm, generateArrayConverter, generateRowConverter, isJsonFunctionOperand}
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
+import org.apache.flink.table.types.logical.{ArrayType, LogicalType, RowType}
+import org.apache.flink.table.types.logical.LogicalTypeRoot.{ARRAY, MAP, MULTISET, ROW}
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode, NullNode, ObjectNode}
+
+import org.apache.calcite.rex.RexCall
+import org.apache.calcite.sql.SqlJsonConstructorNullClause
+import org.apache.calcite.sql.SqlJsonConstructorNullClause.{ABSENT_ON_NULL, NULL_ON_NULL}
+
+/**
+ * [[CallGenerator]] for [[BuiltInMethods.JSON_OBJECT]].
+ *
+ * `JSON_OBJECT` returns a character string. However, this creates an issue when nesting calls to
+ * this function with the intention of creating a nested JSON structure. Instead of a nested JSON
+ * object, a JSON string would be inserted, i.e.
+ * `JSON_OBJECT(KEY 'K' VALUE JSON_OBJECT(KEY 'A' VALUE 'B'))` would result in
+ * `{"K":"{\"A\":\"B\"}"}` instead of the intended `{"K":{"A":"B"}}`. We remedy this by treating
+ * nested calls to this function differently and inserting the value as a raw node instead of as a
+ * string node.
+ */
+class JsonObjectCallGen(call: RexCall) extends CallGenerator {
+  private def jsonUtils = className[SqlJsonUtils]
+
+  override def generate(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      returnType: LogicalType): GeneratedExpression = {
+
+    val nodeTerm = newName("node")
+    ctx.addReusableMember(s"${className[ObjectNode]} $nodeTerm = $jsonUtils.createObjectNode();")
+
+    val nullNodeTerm = newName("nullNode")
+    ctx.addReusableMember(s"${className[NullNode]} $nullNodeTerm = $nodeTerm.nullNode();")
+
+    val onNull = getNullBehavior(operands.head)
+    val populateNodeCode = operands.zipWithIndex.drop(1).grouped(2).map {
+      case Seq((keyExpr, _), (valueExpr, valueIdx)) =>
+        val valueTerm = if (isJsonFunctionOperand(call.operands.get(valueIdx))) {
+          createRawNodeTerm(nodeTerm, valueExpr)
+        } else {
+          createNodeTerm(ctx, nodeTerm, valueExpr)
+        }
+
+        onNull match {
+          case NULL_ON_NULL =>
+            s"""
+               |if (${valueExpr.nullTerm}) {
+               |    $nodeTerm.set(${keyExpr.resultTerm}.toString(), $nullNodeTerm);
+               |} else {
+               |    $nodeTerm.set(${keyExpr.resultTerm}.toString(), $valueTerm);
+               |}
+               |""".stripMargin
+          case ABSENT_ON_NULL =>
+            s"""
+               |if (!${valueExpr.nullTerm}) {
+               |    $nodeTerm.set(${keyExpr.resultTerm}.toString(), $valueTerm);
+               |}
+               |""".stripMargin
+        }
+    }.mkString
+
+    val resultTerm = newName("result")
+    val resultTermType = primitiveTypeTermForType(returnType)
+    val resultCode = s"""
+       |${operands.map(_.code).mkString}
+       |
+       |$nodeTerm.removeAll();
+       |$populateNodeCode
+       |
+       |$resultTermType $resultTerm =
+       |    $BINARY_STRING.fromString($jsonUtils.serializeJson($nodeTerm));
+       |""".stripMargin
+
+    GeneratedExpression(resultTerm, "false", resultCode, returnType)
+  }
+
+  private def getNullBehavior(operand: GeneratedExpression): SqlJsonConstructorNullClause = {
+    operand.literalValue match {
+      case Some(onNull: SqlJsonConstructorNullClause) => onNull
+      case _ => throw new CodeGenException(s"Expected operand to be of type"
+        + s"'${typeTerm(classOf[SqlJsonConstructorNullClause])}''")
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index d55daa6..6a7240c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -797,5 +797,4 @@
         s"new String(${terms.head}, ${terms(1)}.toString())"
     }
   }
-
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
index 22a2ab1..742f994 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
@@ -18,22 +18,46 @@
 
 package org.apache.flink.table.planner.functions;
 
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonOnNull;
 import org.apache.flink.table.api.JsonType;
 import org.apache.flink.table.api.JsonValueOnEmptyOrError;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.types.Row;
 
 import org.apache.commons.io.IOUtils;
 import org.junit.runners.Parameterized;
 
 import java.io.InputStream;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.jsonObject;
 import static org.apache.flink.table.api.Expressions.lit;
 import static org.apache.flink.table.api.Expressions.nullOf;
 import static org.apache.flink.table.api.JsonQueryOnEmptyOrError.EMPTY_ARRAY;
@@ -50,53 +74,51 @@
     @Parameterized.Parameters(name = "{index}: {0}")
     public static List<TestSpec> testData() throws Exception {
         final List<TestSpec> testCases = new ArrayList<>();
-        testCases.add(jsonExists());
-        testCases.add(jsonValue());
-        testCases.addAll(isJson());
-        testCases.addAll(jsonQuery());
+        testCases.add(jsonExistsSpec());
+        testCases.add(jsonValueSpec());
+        testCases.addAll(isJsonSpec());
+        testCases.addAll(jsonQuerySpec());
+        testCases.addAll(jsonObjectSpec());
 
         return testCases;
     }
 
-    private static TestSpec jsonExists() throws Exception {
+    private static TestSpec jsonExistsSpec() throws Exception {
         final String jsonValue = getJsonFromResource("/json/json-exists.json");
         return TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_EXISTS)
                 .onFieldsWithData(jsonValue)
-                .andDataTypes(DataTypes.STRING())
+                .andDataTypes(STRING())
 
                 // NULL
                 .testResult(
-                        nullOf(DataTypes.STRING()).jsonExists("lax $"),
+                        nullOf(STRING()).jsonExists("lax $"),
                         "JSON_EXISTS(CAST(NULL AS STRING), 'lax $')",
                         null,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
 
                 // Path variants
                 .testResult(
-                        $("f0").jsonExists("lax $"),
-                        "JSON_EXISTS(f0, 'lax $')",
-                        true,
-                        DataTypes.BOOLEAN())
+                        $("f0").jsonExists("lax $"), "JSON_EXISTS(f0, 'lax $')", true, BOOLEAN())
                 .testResult(
                         $("f0").jsonExists("lax $.type"),
                         "JSON_EXISTS(f0, 'lax $.type')",
                         true,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 .testResult(
                         $("f0").jsonExists("lax $.author.address.city"),
                         "JSON_EXISTS(f0, 'lax $.author.address.city')",
                         true,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 .testResult(
                         $("f0").jsonExists("lax $.metadata.tags[0]"),
                         "JSON_EXISTS(f0, 'lax $.metadata.tags[0]')",
                         true,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 .testResult(
                         $("f0").jsonExists("lax $.metadata.tags[3]"),
                         "JSON_EXISTS(f0, 'lax $.metadata.tags[3]')",
                         false,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 // This should pass, but is broken due to
                 // https://issues.apache.org/jira/browse/CALCITE-4717.
                 // .testResult(
@@ -108,29 +130,29 @@
                         $("f0").jsonExists("lax $.metadata.references[0].url"),
                         "JSON_EXISTS(f0, 'lax $.metadata.references[0].url')",
                         true,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 .testResult(
                         $("f0").jsonExists("lax $.metadata.references[0].invalid"),
                         "JSON_EXISTS(f0, 'lax $.metadata.references[0].invalid')",
                         false,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
 
                 // ON ERROR
                 .testResult(
                         $("f0").jsonExists("strict $.invalid", JsonExistsOnError.TRUE),
                         "JSON_EXISTS(f0, 'strict $.invalid' TRUE ON ERROR)",
                         true,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 .testResult(
                         $("f0").jsonExists("strict $.invalid", JsonExistsOnError.FALSE),
                         "JSON_EXISTS(f0, 'strict $.invalid' FALSE ON ERROR)",
                         false,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 .testResult(
                         $("f0").jsonExists("strict $.invalid", JsonExistsOnError.UNKNOWN),
                         "JSON_EXISTS(f0, 'strict $.invalid' UNKNOWN ON ERROR)",
                         null,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 .testSqlRuntimeError(
                         "JSON_EXISTS(f0, 'strict $.invalid' ERROR ON ERROR)",
                         "No results for path: $['invalid']")
@@ -139,20 +161,20 @@
                         "No results for path: $['invalid']");
     }
 
-    private static TestSpec jsonValue() throws Exception {
+    private static TestSpec jsonValueSpec() throws Exception {
         final String jsonValue = getJsonFromResource("/json/json-value.json");
         return TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_VALUE)
                 .onFieldsWithData(jsonValue)
-                .andDataTypes(DataTypes.STRING())
+                .andDataTypes(STRING())
 
                 // NULL and invalid types
 
                 .testResult(
-                        lit(null, DataTypes.STRING()).jsonValue("lax $"),
+                        lit(null, STRING()).jsonValue("lax $"),
                         "JSON_VALUE(CAST(NULL AS STRING), 'lax $')",
                         null,
-                        DataTypes.STRING(),
-                        DataTypes.VARCHAR(2000))
+                        STRING(),
+                        VARCHAR(2000))
 
                 // RETURNING + Supported Data Types
 
@@ -160,79 +182,79 @@
                         $("f0").jsonValue("$.type"),
                         "JSON_VALUE(f0, '$.type')",
                         "account",
-                        DataTypes.STRING(),
-                        DataTypes.VARCHAR(2000))
+                        STRING(),
+                        VARCHAR(2000))
                 .testResult(
-                        $("f0").jsonValue("$.activated", DataTypes.BOOLEAN()),
+                        $("f0").jsonValue("$.activated", BOOLEAN()),
                         "JSON_VALUE(f0, '$.activated' RETURNING BOOLEAN)",
                         true,
-                        DataTypes.BOOLEAN())
+                        BOOLEAN())
                 .testResult(
-                        $("f0").jsonValue("$.age", DataTypes.INT()),
+                        $("f0").jsonValue("$.age", INT()),
                         "JSON_VALUE(f0, '$.age' RETURNING INT)",
                         42,
-                        DataTypes.INT())
+                        INT())
                 .testResult(
-                        $("f0").jsonValue("$.balance", DataTypes.DOUBLE()),
+                        $("f0").jsonValue("$.balance", DOUBLE()),
                         "JSON_VALUE(f0, '$.balance' RETURNING DOUBLE)",
                         13.37,
-                        DataTypes.DOUBLE())
+                        DOUBLE())
 
                 // ON EMPTY / ON ERROR
 
                 .testResult(
                         $("f0").jsonValue(
                                         "lax $.invalid",
-                                        DataTypes.STRING(),
+                                        STRING(),
                                         JsonValueOnEmptyOrError.NULL,
                                         null,
                                         JsonValueOnEmptyOrError.ERROR,
                                         null),
                         "JSON_VALUE(f0, 'lax $.invalid' NULL ON EMPTY ERROR ON ERROR)",
                         null,
-                        DataTypes.STRING(),
-                        DataTypes.VARCHAR(2000))
+                        STRING(),
+                        VARCHAR(2000))
                 .testResult(
                         $("f0").jsonValue(
                                         "lax $.invalid",
-                                        DataTypes.INT(),
+                                        INT(),
                                         JsonValueOnEmptyOrError.DEFAULT,
                                         42,
                                         JsonValueOnEmptyOrError.ERROR,
                                         null),
                         "JSON_VALUE(f0, 'lax $.invalid' RETURNING INTEGER DEFAULT 42 ON EMPTY ERROR ON ERROR)",
                         42,
-                        DataTypes.INT())
+                        INT())
                 .testResult(
                         $("f0").jsonValue(
                                         "strict $.invalid",
-                                        DataTypes.STRING(),
+                                        STRING(),
                                         JsonValueOnEmptyOrError.ERROR,
                                         null,
                                         JsonValueOnEmptyOrError.NULL,
                                         null),
                         "JSON_VALUE(f0, 'strict $.invalid' ERROR ON EMPTY NULL ON ERROR)",
                         null,
-                        DataTypes.STRING(),
-                        DataTypes.VARCHAR(2000))
+                        STRING(),
+                        VARCHAR(2000))
                 .testResult(
                         $("f0").jsonValue(
                                         "strict $.invalid",
-                                        DataTypes.INT(),
+                                        INT(),
                                         JsonValueOnEmptyOrError.NULL,
                                         null,
                                         JsonValueOnEmptyOrError.DEFAULT,
                                         42),
                         "JSON_VALUE(f0, 'strict $.invalid' RETURNING INTEGER NULL ON EMPTY DEFAULT 42 ON ERROR)",
                         42,
-                        DataTypes.INT());
+                        INT());
     }
 
-    private static List<TestSpec> isJson() {
+    private static List<TestSpec> isJsonSpec() {
         return Arrays.asList(
                 TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
                         .onFieldsWithData(1)
-                        .andDataTypes(DataTypes.INT())
+                        .andDataTypes(INT())
                         .testSqlValidationError(
                                 "f0 IS JSON",
                                 "Cannot apply 'IS JSON VALUE' to arguments of type '<INTEGER> IS JSON VALUE'. "
@@ -242,87 +264,77 @@
                                 String.format("Invalid function call:%nIS_JSON(INT)")),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
                         .onFieldsWithData((String) null)
-                        .andDataTypes(DataTypes.STRING())
-                        .testResult(
-                                $("f0").isJson(),
-                                "f0 IS JSON",
-                                false,
-                                DataTypes.BOOLEAN().notNull()),
+                        .andDataTypes(STRING())
+                        .testResult($("f0").isJson(), "f0 IS JSON", false, BOOLEAN().notNull()),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
                         .onFieldsWithData("a")
-                        .andDataTypes(DataTypes.STRING())
-                        .testResult(
-                                $("f0").isJson(),
-                                "f0 IS JSON",
-                                false,
-                                DataTypes.BOOLEAN().notNull()),
+                        .andDataTypes(STRING())
+                        .testResult($("f0").isJson(), "f0 IS JSON", false, BOOLEAN().notNull()),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
                         .onFieldsWithData("\"a\"")
-                        .andDataTypes(DataTypes.STRING())
-                        .testResult(
-                                $("f0").isJson(), "f0 IS JSON", true, DataTypes.BOOLEAN().notNull())
+                        .andDataTypes(STRING())
+                        .testResult($("f0").isJson(), "f0 IS JSON", true, BOOLEAN().notNull())
                         .testResult(
                                 $("f0").isJson(JsonType.VALUE),
                                 "f0 IS JSON VALUE",
                                 true,
-                                DataTypes.BOOLEAN().notNull())
+                                BOOLEAN().notNull())
                         .testResult(
                                 $("f0").isJson(JsonType.SCALAR),
                                 "f0 IS JSON SCALAR",
                                 true,
-                                DataTypes.BOOLEAN().notNull())
+                                BOOLEAN().notNull())
                         .testResult(
                                 $("f0").isJson(JsonType.ARRAY),
                                 "f0 IS JSON ARRAY",
                                 false,
-                                DataTypes.BOOLEAN().notNull())
+                                BOOLEAN().notNull())
                         .testResult(
                                 $("f0").isJson(JsonType.OBJECT),
                                 "f0 IS JSON OBJECT",
                                 false,
-                                DataTypes.BOOLEAN().notNull()),
+                                BOOLEAN().notNull()),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
                         .onFieldsWithData("{}")
-                        .andDataTypes(DataTypes.STRING())
-                        .testResult(
-                                $("f0").isJson(), "f0 IS JSON", true, DataTypes.BOOLEAN().notNull())
+                        .andDataTypes(STRING())
+                        .testResult($("f0").isJson(), "f0 IS JSON", true, BOOLEAN().notNull())
                         .testResult(
                                 $("f0").isJson(JsonType.VALUE),
                                 "f0 IS JSON VALUE",
                                 true,
-                                DataTypes.BOOLEAN().notNull())
+                                BOOLEAN().notNull())
                         .testResult(
                                 $("f0").isJson(JsonType.SCALAR),
                                 "f0 IS JSON SCALAR",
                                 false,
-                                DataTypes.BOOLEAN().notNull())
+                                BOOLEAN().notNull())
                         .testResult(
                                 $("f0").isJson(JsonType.ARRAY),
                                 "f0 IS JSON ARRAY",
                                 false,
-                                DataTypes.BOOLEAN().notNull())
+                                BOOLEAN().notNull())
                         .testResult(
                                 $("f0").isJson(JsonType.OBJECT),
                                 "f0 IS JSON OBJECT",
                                 true,
-                                DataTypes.BOOLEAN().notNull()));
+                                BOOLEAN().notNull()));
     }
 
-    private static List<TestSpec> jsonQuery() throws Exception {
+    private static List<TestSpec> jsonQuerySpec() throws Exception {
         final String jsonValue = getJsonFromResource("/json/json-query.json");
         return Arrays.asList(
                 TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUERY)
                         .onFieldsWithData((String) null)
-                        .andDataTypes(DataTypes.STRING())
+                        .andDataTypes(STRING())
                         .testResult(
                                 $("f0").jsonQuery("$"),
                                 "JSON_QUERY(f0, '$')",
                                 null,
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000)),
+                                STRING(),
+                                VARCHAR(2000)),
                 TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUERY)
                         .onFieldsWithData(jsonValue)
-                        .andDataTypes(DataTypes.STRING())
+                        .andDataTypes(STRING())
 
                         // Wrapping Behavior
 
@@ -330,38 +342,38 @@
                                 $("f0").jsonQuery("$.a1", WITHOUT_ARRAY),
                                 "JSON_QUERY(f0, '$.a1' WITHOUT WRAPPER)",
                                 "[]",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery("$.a1", CONDITIONAL_ARRAY),
                                 "JSON_QUERY(f0, '$.a1' WITH CONDITIONAL WRAPPER)",
                                 "[]",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery("$.o1", CONDITIONAL_ARRAY),
                                 "JSON_QUERY(f0, '$.o1' WITH CONDITIONAL WRAPPER)",
                                 "[{}]",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery("$.a1", UNCONDITIONAL_ARRAY),
                                 "JSON_QUERY(f0, '$.a1' WITH UNCONDITIONAL WRAPPER)",
                                 "[[]]",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery("$.n1", CONDITIONAL_ARRAY),
                                 "JSON_QUERY(f0, '$.n1' WITH CONDITIONAL WRAPPER)",
                                 "[1]",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery("$.s1", CONDITIONAL_ARRAY),
                                 "JSON_QUERY(f0, '$.s1' WITH CONDITIONAL WRAPPER)",
                                 "[\"Test\"]",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
 
                         // Empty Behavior
 
@@ -369,20 +381,20 @@
                                 $("f0").jsonQuery("lax $.err1", WITHOUT_ARRAY, NULL, NULL),
                                 "JSON_QUERY(f0, 'lax $.err1' NULL ON EMPTY)",
                                 null,
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery("lax $.err2", WITHOUT_ARRAY, EMPTY_ARRAY, NULL),
                                 "JSON_QUERY(f0, 'lax $.err2' EMPTY ARRAY ON EMPTY)",
                                 "[]",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery("lax $.err3", WITHOUT_ARRAY, EMPTY_OBJECT, NULL),
                                 "JSON_QUERY(f0, 'lax $.err3' EMPTY OBJECT ON EMPTY)",
                                 "{}",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testSqlRuntimeError(
                                 "JSON_QUERY(f0, 'lax $.err4' ERROR ON EMPTY)",
                                 "Empty result of JSON_QUERY function is not allowed")
@@ -396,22 +408,22 @@
                                 $("f0").jsonQuery("strict $.err6", WITHOUT_ARRAY, NULL, NULL),
                                 "JSON_QUERY(f0, 'strict $.err6' NULL ON ERROR)",
                                 null,
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery(
                                                 "strict $.err7", WITHOUT_ARRAY, NULL, EMPTY_ARRAY),
                                 "JSON_QUERY(f0, 'strict $.err7' EMPTY ARRAY ON ERROR)",
                                 "[]",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testResult(
                                 $("f0").jsonQuery(
                                                 "strict $.err8", WITHOUT_ARRAY, NULL, EMPTY_OBJECT),
                                 "JSON_QUERY(f0, 'strict $.err8' EMPTY OBJECT ON ERROR)",
                                 "{}",
-                                DataTypes.STRING(),
-                                DataTypes.VARCHAR(2000))
+                                STRING(),
+                                VARCHAR(2000))
                         .testSqlRuntimeError(
                                 "JSON_QUERY(f0, 'strict $.err9' ERROR ON ERROR)",
                                 "No results for path")
@@ -420,8 +432,152 @@
                                 "No results for path"));
     }
 
+    private static List<TestSpec> jsonObjectSpec() {
+        final Map<String, String> mapData = new HashMap<>();
+        mapData.put("M1", "V1");
+        mapData.put("M2", "V2");
+
+        final Map<String, Integer> multisetData = new HashMap<>();
+        multisetData.put("M1", 1);
+        multisetData.put("M2", 2);
+
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECT)
+                        .onFieldsWithData(0)
+                        .testResult(
+                                jsonObject(JsonOnNull.NULL),
+                                "JSON_OBJECT()",
+                                "{}",
+                                STRING().notNull(),
+                                VARCHAR(2000).notNull())
+                        .testResult(
+                                jsonObject(JsonOnNull.NULL, "K", nullOf(STRING())),
+                                "JSON_OBJECT(KEY 'K' VALUE CAST(NULL AS STRING) NULL ON NULL)",
+                                "{\"K\":null}",
+                                STRING().notNull(),
+                                VARCHAR(2000).notNull())
+                        .testResult(
+                                jsonObject(JsonOnNull.ABSENT, "K", nullOf(STRING())),
+                                "JSON_OBJECT(KEY 'K' VALUE CAST(NULL AS STRING) ABSENT ON NULL)",
+                                "{}",
+                                STRING().notNull(),
+                                VARCHAR(2000).notNull()),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECT)
+                        .onFieldsWithData(
+                                "V",
+                                true,
+                                1,
+                                1.23d,
+                                1.23,
+                                LocalDateTime.parse("1990-06-02T13:37:42.001"),
+                                Instant.parse("1990-06-02T13:37:42.001Z"),
+                                Arrays.asList("A1", "A2", "A3"),
+                                Row.of("R1", Instant.parse("1990-06-02T13:37:42.001Z")),
+                                mapData,
+                                multisetData,
+                                "Test".getBytes(StandardCharsets.UTF_8),
+                                "Test".getBytes(StandardCharsets.UTF_8),
+                                Row.of(Collections.singletonList(Row.of(1, 2))))
+                        .andDataTypes(
+                                STRING(),
+                                BOOLEAN(),
+                                INT(),
+                                DOUBLE(),
+                                DECIMAL(3, 2),
+                                TIMESTAMP(3),
+                                TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
+                                ARRAY(STRING()),
+                                ROW(STRING(), TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+                                MAP(STRING(), STRING()),
+                                MAP(STRING(), INT()),
+                                BINARY(4),
+                                VARBINARY(4),
+                                ROW(ARRAY(ROW(INT(), INT()))))
+                        .withFunction(CreateMultiset.class)
+                        .testResult(
+                                jsonObject(
+                                        JsonOnNull.NULL,
+                                        "K0",
+                                        $("f0"),
+                                        "K1",
+                                        $("f1"),
+                                        "K2",
+                                        $("f2"),
+                                        "K3",
+                                        $("f3"),
+                                        "K4",
+                                        $("f4"),
+                                        "K5",
+                                        $("f5"),
+                                        "K6",
+                                        $("f6"),
+                                        "K7",
+                                        $("f7"),
+                                        "K8",
+                                        $("f8"),
+                                        "K9",
+                                        $("f9"),
+                                        "K10",
+                                        call("CreateMultiset", $("f10")),
+                                        "K11",
+                                        $("f11"),
+                                        "K12",
+                                        $("f12"),
+                                        "K13",
+                                        $("f13"),
+                                        "K14",
+                                        jsonObject(JsonOnNull.NULL, "A", "B")),
+                                "JSON_OBJECT("
+                                        + "'K0' VALUE f0, "
+                                        + "'K1' VALUE f1, "
+                                        + "'K2' VALUE f2, "
+                                        + "'K3' VALUE f3, "
+                                        + "'K4' VALUE f4, "
+                                        + "'K5' VALUE f5, "
+                                        + "'K6' VALUE f6, "
+                                        + "'K7' VALUE f7, "
+                                        + "'K8' VALUE f8, "
+                                        + "'K9' VALUE f9, "
+                                        + "'K10' VALUE CreateMultiset(f10), "
+                                        + "'K11' VALUE f11, "
+                                        + "'K12' VALUE f12, "
+                                        + "'K13' VALUE f13, "
+                                        + "'K14' VALUE JSON_OBJECT(KEY 'A' VALUE 'B')"
+                                        + ")",
+                                "{"
+                                        + "\"K0\":\"V\","
+                                        + "\"K1\":true,"
+                                        + "\"K2\":1,"
+                                        + "\"K3\":1.23,"
+                                        + "\"K4\":1.23,"
+                                        + "\"K5\":\"1990-06-02T13:37:42.001\","
+                                        + "\"K6\":\"1990-06-02T13:37:42.001Z\","
+                                        + "\"K7\":[\"A1\",\"A2\",\"A3\"],"
+                                        + "\"K8\":{\"f0\":\"R1\",\"f1\":\"1990-06-02T13:37:42.001Z\"},"
+                                        + "\"K9\":{\"M1\":\"V1\",\"M2\":\"V2\"},"
+                                        + "\"K10\":{\"M1\":1,\"M2\":2},"
+                                        + "\"K11\":\"VGVzdA==\","
+                                        + "\"K12\":\"VGVzdA==\","
+                                        + "\"K13\":{\"f0\":[{\"f0\":1,\"f1\":2}]},"
+                                        + "\"K14\":{\"A\":\"B\"}"
+                                        + "}",
+                                STRING().notNull(),
+                                VARCHAR(2000).notNull()));
+    }
+
     // ---------------------------------------------------------------------------------------------
 
+    /**
+     * {@link BuiltInFunctionTestBase} uses a {@code VALUES} clause, but there currently is no way
+     * to create a {@code MULTISET} for it yet. We work around this for now with a custom function.
+     */
+    public static class CreateMultiset extends ScalarFunction {
+        public @DataTypeHint("MULTISET<STRING>") Map<String, Integer> eval(
+                Map<String, Integer> map) {
+            return map;
+        }
+    }
+
     private static String getJsonFromResource(String fileName) throws Exception {
         final InputStream jsonResource = JsonFunctionsITCase.class.getResourceAsStream(fileName);
         if (jsonResource == null) {
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
new file mode 100644
index 0000000..a7bbf4c
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Utilities for JSON functions.
+ *
+ * <p>Note that these methods are called from generated code.
+ */
+@Internal
+public class SqlJsonUtils {
+
+    private static final JsonFactory JSON_FACTORY = new JsonFactory();
+    private static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY);
+
+    /** Returns a new {@link ObjectNode}. */
+    public static ObjectNode createObjectNode() {
+        return MAPPER.createObjectNode();
+    }
+
+    /** Serializes the given {@link JsonNode} to a JSON string. */
+    public static String serializeJson(JsonNode node) {
+        try {
+            return MAPPER.writeValueAsString(node);
+        } catch (JsonProcessingException e) {
+            throw new TableException("JSON object could not be serialized: " + node.asText(), e);
+        }
+    }
+
+    private SqlJsonUtils() {}
+}