[FLINK-16203][table] Support JSON_OBJECT
This closes #17186
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index a953d14..3b40568 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -708,6 +708,39 @@
-- '[]'
JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
```
+ - sql: JSON_OBJECT([[KEY] key VALUE value]* [ { NULL | ABSENT } ON NULL ])
+ table: jsonObject(JsonOnNull, keyValues...)
+ description: |
+ Builds a JSON object string from a list of key-value pairs.
+
+ Note that keys must be non-`NULL` string literals, while values may be arbitrary expressions.
+
+ This function returns a JSON string. The `ON NULL` behavior defines how to treat `NULL`
+ values. If omitted, `NULL ON NULL` is assumed by default.
+
+ ```
+ -- '{}'
+ JSON_OBJECT()
+
+ -- '{"K1":"V1","K2":"V2"}'
+ JSON_OBJECT('K1' VALUE 'V1', 'K2' VALUE 'V2')
+
+ -- Expressions as values
+ JSON_OBJECT('orderNo' VALUE orders.orderId)
+
+ -- ON NULL
+ JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) NULL ON NULL) -- '{"K1":null}'
+ JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL) -- '{}'
+
+ -- '{"K1":{"K2":"V"}}'
+ JSON_OBJECT(
+ KEY 'K1'
+ VALUE JSON_OBJECT(
+ KEY 'K2'
+ VALUE 'V'
+ )
+ )
+ ```
valueconstruction:
- sql: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index abfac47..ecf8b69 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -840,6 +840,40 @@
-- '[]'
JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
```
+ - sql: JSON_OBJECT([[KEY] key VALUE value]* [ { NULL | ABSENT } ON NULL ])
+ table: jsonObject(JsonOnNull, keyValues...)
+ description: |
+ Builds a JSON object string from a list of key-value pairs.
+
+ Note that keys must be non-`NULL` string literals, while values may be arbitrary expressions.
+
+ This function returns a JSON string. The `ON NULL` behavior defines how to treat `NULL`
+ values. If omitted, `NULL ON NULL` is assumed by default.
+
+
+ ```
+ -- '{}'
+ JSON_OBJECT()
+
+ -- '{"K1":"V1","K2":"V2"}'
+ JSON_OBJECT('K1' VALUE 'V1', 'K2' VALUE 'V2')
+
+ -- Expressions as values
+ JSON_OBJECT('orderNo' VALUE orders.orderId)
+
+ -- ON NULL
+ JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) NULL ON NULL) -- '{"K1":null}'
+ JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL) -- '{}'
+
+ -- '{"K1":{"K2":"V"}}'
+ JSON_OBJECT(
+ KEY 'K1'
+ VALUE JSON_OBJECT(
+ KEY 'K2'
+ VALUE 'V'
+ )
+ )
+ ```
valueconstruction:
- sql: |
diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py
index 93d959f..2f26233 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -447,6 +447,20 @@
return getattr(JJsonQueryOnEmptyOrError, self.name)
+class JsonOnNull(Enum):
+ """
+ Behavior for entries with a null value for json_object().
+ """
+
+ NULL = 0,
+ ABSENT = 1
+
+ def _to_j_json_on_null(self):
+ gateway = get_gateway()
+ JJsonOnNull = gateway.jvm.org.apache.flink.table.api.JsonOnNull
+ return getattr(JJsonOnNull, self.name)
+
+
T = TypeVar('T')
diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py
index 7f85342..4e2d91d 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -19,7 +19,7 @@
from pyflink import add_version_doc
from pyflink.java_gateway import get_gateway
-from pyflink.table.expression import Expression, _get_java_expression, TimePointUnit
+from pyflink.table.expression import Expression, _get_java_expression, TimePointUnit, JsonOnNull
from pyflink.table.types import _to_java_data_type, DataType, _to_java_type
from pyflink.table.udf import UserDefinedFunctionWrapper, UserDefinedTableFunctionWrapper
from pyflink.util.java_utils import to_jarray, load_java_class
@@ -29,8 +29,8 @@
'current_timestamp', 'current_watermark', 'local_time', 'local_timestamp',
'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_',
'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat',
- 'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'call',
- 'call_sql', 'source_watermark']
+ 'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_object',
+ 'call', 'call_sql', 'source_watermark']
def _leaf_op(op_name: str) -> Expression:
@@ -67,6 +67,12 @@
_get_java_expression(forth)))
+def _varargs_op(op_name: str, *args):
+ gateway = get_gateway()
+ return Expression(
+ getattr(gateway.jvm.Expressions, op_name)(*[_get_java_expression(arg) for arg in args]))
+
+
def _add_version_doc():
from inspect import getmembers, isfunction
from pyflink.table import expressions
@@ -581,6 +587,33 @@
return _binary_op("withoutColumns", head, tails)
+def json_object(on_null: JsonOnNull = JsonOnNull.NULL, *args) -> Expression:
+ """
+ Builds a JSON object string from a list of key-value pairs.
+
+ `args` is an even-numbered list of alternating key/value pairs. Note that keys must be
+ non-`NULL` string literals, while values may be arbitrary expressions.
+
+ This function returns a JSON string. The `on_null` behavior defines how to treat `NULL` values.
+
+ Examples:
+ ::
+
+ >>> json_object() # '{}'
+ >>> json_object(JsonOnNull.NULL, "K1", "V1", "K2", "V2") # '{"K1":"V1","K2":"V2"}'
+
+ >>> # Expressions as values
+ >>> json_object(JsonOnNull.NULL, "orderNo", col("orderId"))
+
+ >>> json_object(JsonOnNull.NULL, "K1", null_of(DataTypes.STRING())) # '{"K1":null}'
+ >>> json_object(JsonOnNull.ABSENT, "K1", null_of(DataTypes.STRING())) # '{}'
+
+ >>> # '{"K1":{"K2":"V"}}'
+ >>> json_object(JsonOnNull.NULL, "K1", json_object(JsonOnNull.NULL, "K2", "V"))
+ """
+ return _varargs_op("jsonObject", *(on_null._to_j_json_on_null(), *args))
+
+
def call(f: Union[str, UserDefinedFunctionWrapper], *args) -> Expression:
"""
The first parameter `f` could be a str or a Python user-defined function.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 51770a7..c21bf13 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -43,6 +43,7 @@
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECT;
/**
* Entry point of the Table API Expression DSL such as: {@code $("myField").plus(10).abs()}
@@ -563,6 +564,42 @@
}
/**
+ * Builds a JSON object string from a list of key-value pairs.
+ *
+ * <p>{@param keyValues} is an even-numbered list of alternating key/value pairs. Note that keys
+ * must be non-{@code NULL} string literals, while values may be arbitrary expressions.
+ *
+ * <p>This function returns a JSON string. The {@link JsonOnNull onNull} behavior defines how to
+ * treat {@code NULL} values.
+ *
+ * <p>Examples:
+ *
+ * <pre>{@code
+ * // {}
+ * jsonObject(JsonOnNull.NULL)
+ * // "{\"K1\":\"V1\",\"K2\":\"V2\"}"
+ * // {"K1":"V1","K2":"V2"}
+ * jsonObject(JsonOnNull.NULL, "K1", "V1", "K2", "V2")
+ *
+ * // Expressions as values
+ * jsonObject(JsonOnNull.NULL, "orderNo", $("orderId"))
+ *
+ * // ON NULL
+ * jsonObject(JsonOnNull.NULL, "K1", nullOf(DataTypes.STRING())) // "{\"K1\":null}"
+ * jsonObject(JsonOnNull.ABSENT, "K1", nullOf(DataTypes.STRING())) // "{}"
+ *
+ * // {"K1":{"K2":"V"}}
+ * jsonObject(JsonOnNull.NULL, "K1", jsonObject(JsonOnNull.NULL, "K2", "V"))
+ * }</pre>
+ */
+ public static ApiExpression jsonObject(JsonOnNull onNull, Object... keyValues) {
+ final Object[] arguments =
+ Stream.concat(Stream.of(onNull), Arrays.stream(keyValues)).toArray(Object[]::new);
+
+ return apiCall(JSON_OBJECT, arguments);
+ }
+
+ /**
* A call to a function that will be looked up in a catalog. There are two kinds of functions:
*
* <ul>
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index 2bb033c..d14009b 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -784,4 +784,35 @@
* }}}
*/
def not(expression: Expression): Expression = Expressions.not(expression)
+
+ /**
+ * Builds a JSON object string from a list of key-value pairs.
+ *
+ * <code>keyValues</code> is an even-numbered list of alternating key/value pairs. Note that keys
+ * must be string literals, values may be arbitrary expressions.
+ *
+ * This function returns a JSON string. The [[JsonOnNull onNull]] behavior defines how to treat
+ * <code>NULL</code> values.
+ *
+ * Examples:
+ * {{{
+ * // {}
+ * jsonObject(JsonOnNull.NULL)
+ * // {"K1":"V1","K2":"V2"}
+ * jsonObject(JsonOnNull.NULL, "K1", "V1", "K2", "V2")
+ *
+ * // Expressions as values
+ * jsonObject(JsonOnNull.NULL, "orderNo", $("orderId"))
+ *
+ * // ON NULL
+ * jsonObject(JsonOnNull.NULL, "K1", nullOf(DataTypes.STRING())) // "{\"K1\":null}"
+ * jsonObject(JsonOnNull.ABSENT, "K1", nullOf(DataTypes.STRING())) // '{}'
+ *
+ * // {"K1":{"K2":"V"}}
+ * jsonObject(JsonOnNull.NULL, "K1", jsonObject(JsonOnNull.NULL, "K2", "V"))
+ * }}}
+ */
+ def jsonObject(onNull: JsonOnNull, keyValues: Any*): Expression = {
+ Expressions.jsonObject(onNull, keyValues)
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/JsonOnNull.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/JsonOnNull.java
new file mode 100644
index 0000000..78c98bc
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/JsonOnNull.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.TableSymbol;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+/** Behavior for entries with a null value for {@link BuiltInFunctionDefinitions#JSON_OBJECT}. */
+@PublicEvolving
+public enum JsonOnNull implements TableSymbol {
+ /** Use a JSON {@code null} value. */
+ NULL,
+ /** Omit this key from the resulting JSON. */
+ ABSENT
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index a566e7e..f5964c89 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1558,6 +1558,15 @@
.runtimeDeferred()
.build();
+ public static final BuiltInFunctionDefinition JSON_OBJECT =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("JSON_OBJECT")
+ .kind(SCALAR)
+ .inputTypeStrategy(SpecificInputTypeStrategies.JSON_OBJECT)
+ .outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
+ .runtimeDeferred()
+ .build();
+
// --------------------------------------------------------------------------------------------
// Other functions
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java
index 41b88bd..61cb639 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java
@@ -94,10 +94,19 @@
for (int i = 0; i < argumentStrategies.size(); i++) {
final Signature.Argument argument =
argumentStrategies.get(i).getExpectedArgument(definition, i);
- arguments.add(argument);
+
+ final Signature.Argument newArgument;
+ if (i == 0) {
+ newArgument = Signature.Argument.of(String.format("[%s", argument.getType()));
+ } else if (i == argumentStrategies.size() - 1) {
+ newArgument = Signature.Argument.of(String.format("%s]...", argument.getType()));
+ } else {
+ newArgument = argument;
+ }
+
+ arguments.add(newArgument);
}
- // Unfortunately there is no way to represent the repetition in the signature
final Signature signature = Signature.of(arguments);
return Collections.singletonList(signature);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index d41784d..e9d62db 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -19,13 +19,23 @@
package org.apache.flink.table.types.inference.strategies;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.JsonOnNull;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.StructuredType;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
import static org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.compositeSequence;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.repeatingSequence;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.symbol;
/**
* Entry point for specific input type strategies not covered in {@link InputTypeStrategies}.
@@ -46,6 +56,26 @@
public static final InputTypeStrategy CURRENT_WATERMARK =
new CurrentWatermarkInputTypeStrategy();
+ /**
+ * Input strategy for {@link BuiltInFunctionDefinitions#JSON_OBJECT}.
+ *
+ * <p>The first argument defines the on-null behavior and is followed by any number of key-value
+ * pairs. Keys must be character string literals, while values are arbitrary expressions.
+ */
+ public static final InputTypeStrategy JSON_OBJECT =
+ compositeSequence()
+ .argument(symbol(JsonOnNull.class))
+ .finishWithVarying(
+ repeatingSequence(
+ and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
+ or(
+ logical(LogicalTypeFamily.CHARACTER_STRING),
+ logical(LogicalTypeFamily.BINARY_STRING),
+ logical(LogicalTypeFamily.TIMESTAMP),
+ logical(LogicalTypeFamily.CONSTRUCTED),
+ logical(LogicalTypeRoot.BOOLEAN),
+ logical(LogicalTypeFamily.NUMERIC))));
+
// --------------------------------------------------------------------------------------------
// Strategies composed of other strategies
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategyTest.java
index 5941d49..9c09b79 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategyTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategyTest.java
@@ -52,13 +52,13 @@
.calledWithArgumentTypes(STRING(), INT())
.expectErrorMessage(
String.format(
- "Invalid input arguments. Expected signatures are:%nf(INT, STRING)")),
+ "Invalid input arguments. Expected signatures are:%nf([INT, STRING]...)")),
TestSpec.forStrategy(
"Incorrect number of arguments",
repeatingSequence(explicit(INT()), explicit(STRING())))
.calledWithArgumentTypes(INT())
.expectErrorMessage(
String.format(
- "Invalid input arguments. Expected signatures are:%nf(INT, STRING)")));
+ "Invalid input arguments. Expected signatures are:%nf([INT, STRING]...)")));
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java
index b36b840b..390623d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/CustomizedConverters.java
@@ -56,6 +56,7 @@
CONVERTERS.put(BuiltInFunctionDefinitions.JSON_EXISTS, new JsonExistsConverter());
CONVERTERS.put(BuiltInFunctionDefinitions.JSON_VALUE, new JsonValueConverter());
CONVERTERS.put(BuiltInFunctionDefinitions.JSON_QUERY, new JsonQueryConverter());
+ CONVERTERS.put(BuiltInFunctionDefinitions.JSON_OBJECT, new JsonObjectConverter());
CONVERTERS.put(InternalFunctionDefinitions.THROW_EXCEPTION, new ThrowExceptionConverter());
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonObjectConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonObjectConverter.java
new file mode 100644
index 0000000..da1c378
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonObjectConverter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter.converters;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.JsonOnNull;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/** Conversion for {@link BuiltInFunctionDefinitions#JSON_OBJECT}. */
+@Internal
+class JsonObjectConverter extends CustomizedConverter {
+ @Override
+ public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertContext context) {
+ checkArgument(call, (call.getChildren().size() - 1) % 2 == 0);
+ final List<RexNode> operands = new LinkedList<>();
+
+ final SqlJsonConstructorNullClause onNull =
+ ((ValueLiteralExpression) call.getChildren().get(0))
+ .getValueAs(JsonOnNull.class)
+ .map(this::convertOnNull)
+ .orElseThrow(() -> new TableException("Missing argument for ON NULL."));
+ operands.add(context.getRelBuilder().getRexBuilder().makeFlag(onNull));
+
+ for (int i = 1; i < call.getChildren().size(); i++) {
+ final Expression operand = call.getChildren().get(i);
+ if (i % 2 == 1 && !(operand instanceof ValueLiteralExpression)) {
+ throw new TableException(
+ String.format("Argument at position %s must be a string literal.", i));
+ }
+
+ operands.add(context.toRexNode(operand));
+ }
+
+ return context.getRelBuilder()
+ .getRexBuilder()
+ .makeCall(FlinkSqlOperatorTable.JSON_OBJECT, operands);
+ }
+
+ private SqlJsonConstructorNullClause convertOnNull(JsonOnNull onNull) {
+ switch (onNull) {
+ case NULL:
+ return SqlJsonConstructorNullClause.NULL_ON_NULL;
+ case ABSENT:
+ return SqlJsonConstructorNullClause.ABSENT_ON_NULL;
+ default:
+ throw new TableException("Unknown ON NULL behavior: " + onNull);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index a5afcc4..9ffa130 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -1169,6 +1169,7 @@
public static final SqlFunction JSON_EXISTS = SqlStdOperatorTable.JSON_EXISTS;
public static final SqlFunction JSON_VALUE = SqlStdOperatorTable.JSON_VALUE;
public static final SqlFunction JSON_QUERY = SqlStdOperatorTable.JSON_QUERY;
+ public static final SqlFunction JSON_OBJECT = new SqlJsonObjectFunction();
public static final SqlPostfixOperator IS_JSON_VALUE = SqlStdOperatorTable.IS_JSON_VALUE;
public static final SqlPostfixOperator IS_JSON_OBJECT = SqlStdOperatorTable.IS_JSON_OBJECT;
public static final SqlPostfixOperator IS_JSON_ARRAY = SqlStdOperatorTable.IS_JSON_ARRAY;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonObjectFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonObjectFunction.java
new file mode 100644
index 0000000..6996d54
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonObjectFunction.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.Locale;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+/**
+ * This class has been copied from Calcite to backport the fix made during CALCITE-4394.
+ *
+ * <p>TODO Remove this class with Calcite 1.27 and replace it with {@link
+ * SqlStdOperatorTable#JSON_OBJECT}.
+ */
+public class SqlJsonObjectFunction extends SqlFunction {
+ public SqlJsonObjectFunction() {
+ super(
+ "JSON_OBJECT",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.VARCHAR_2000,
+ (callBinding, returnType, operandTypes) -> {
+ RelDataTypeFactory typeFactory = callBinding.getTypeFactory();
+ for (int i = 0; i < operandTypes.length; i++) {
+ operandTypes[i] =
+ i == 0
+ ? typeFactory.createSqlType(SqlTypeName.SYMBOL)
+ : i % 2 == 1
+ ? typeFactory.createSqlType(SqlTypeName.VARCHAR)
+ : typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.ANY),
+ true);
+ }
+ },
+ null,
+ SqlFunctionCategory.SYSTEM);
+ }
+
+ @Override
+ public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.from(1);
+ }
+
+ @Override
+ protected void checkOperandCount(
+ SqlValidator validator, SqlOperandTypeChecker argType, SqlCall call) {
+ assert call.operandCount() % 2 == 1;
+ }
+
+ @Override
+ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+ final int count = callBinding.getOperandCount();
+ for (int i = 1; i < count; i += 2) {
+ RelDataType nameType = callBinding.getOperandType(i);
+ if (!SqlTypeUtil.inCharFamily(nameType)) {
+ if (throwOnFailure) {
+ throw callBinding.newError(RESOURCE.expectedCharacter());
+ }
+ return false;
+ }
+ if (nameType.isNullable()) {
+ if (throwOnFailure) {
+ throw callBinding.newError(
+ RESOURCE.argumentMustNotBeNull(callBinding.operand(i).toString()));
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+ if (operands[0] == null) {
+ operands[0] = SqlLiteral.createSymbol(SqlJsonConstructorNullClause.NULL_ON_NULL, pos);
+ }
+ return super.createCall(functionQualifier, pos, operands);
+ }
+
+ @Override
+ public String getSignatureTemplate(int operandsCount) {
+ assert operandsCount % 2 == 1;
+ StringBuilder sb = new StringBuilder();
+ sb.append("{0}(");
+ for (int i = 1; i < operandsCount; i++) {
+ sb.append(String.format(Locale.ROOT, "{%d} ", i + 1));
+ }
+ sb.append("{1})");
+ return sb.toString();
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+ assert call.operandCount() % 2 == 1;
+ final SqlWriter.Frame frame = writer.startFunCall(getName());
+ SqlWriter.Frame listFrame = writer.startList("", "");
+ for (int i = 1; i < call.operandCount(); i += 2) {
+ writer.sep(",");
+ writer.keyword("KEY");
+ call.operand(i).unparse(writer, leftPrec, rightPrec);
+ writer.keyword("VALUE");
+ call.operand(i + 1).unparse(writer, leftPrec, rightPrec);
+ }
+ writer.endList(listFrame);
+
+ SqlJsonConstructorNullClause nullClause = getEnumValue(call.operand(0));
+ switch (nullClause) {
+ case ABSENT_ON_NULL:
+ writer.keyword("ABSENT ON NULL");
+ break;
+ case NULL_ON_NULL:
+ writer.keyword("NULL ON NULL");
+ break;
+ default:
+ throw new IllegalStateException("unreachable code");
+ }
+ writer.endFunCall(frame);
+ }
+
+ private <E extends Enum<E>> E getEnumValue(SqlNode operand) {
+ return (E) ((SqlLiteral) operand).getValue();
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 4cea7d2..3594bc5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -782,6 +782,8 @@
case JSON_VALUE => new JsonValueCallGen().generate(ctx, operands, resultType)
+ case JSON_OBJECT => new JsonObjectCallGen(call).generate(ctx, operands, resultType)
+
case _: SqlThrowExceptionFunction =>
val nullValue = generateNullLiteral(resultType, nullCheck = true)
val code =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 166011f..0d9a60e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -25,6 +25,7 @@
import org.apache.flink.table.functions.{ConstantFunctionContext, FunctionContext, UserDefinedFunction}
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.FunctionCodeGenerator.generateFunction
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.JSON_OBJECT
import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
import org.apache.flink.table.planner.utils.Logging
import org.apache.flink.table.types.DataType
@@ -32,7 +33,7 @@
import org.apache.flink.table.util.TimestampStringUtils.fromLocalDateTime
import org.apache.calcite.avatica.util.ByteString
-import org.apache.calcite.rex.{RexBuilder, RexExecutor, RexNode}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexExecutor, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
import scala.collection.JavaConverters._
@@ -78,6 +79,10 @@
(SqlTypeName.MAP, _) |
(SqlTypeName.MULTISET, _) => None
+ // Exclude some JSON functions which behave differently when called as an argument of another
+ // call of one of these functions.
+ case (_, call: RexCall) if call.getOperator == JSON_OBJECT => None
+
case (_, e) => Some(e)
}
@@ -140,73 +145,76 @@
if (pythonUDFExprs.exists(_ eq unreduced)) {
// if contains python function then just insert the original expression.
reducedValues.add(unreduced)
- } else {
- unreduced.getType.getSqlTypeName match {
- // we insert the original expression for object literals
- case SqlTypeName.ANY |
- SqlTypeName.OTHER |
- SqlTypeName.ROW |
- SqlTypeName.STRUCTURED |
- SqlTypeName.ARRAY |
- SqlTypeName.MAP |
- SqlTypeName.MULTISET =>
- reducedValues.add(unreduced)
- case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
- val escapeVarchar = BinaryStringDataUtil.safeToString(
- reduced.getField(reducedIdx).asInstanceOf[BinaryStringData])
- reducedValues.add(maySkipNullLiteralReduce(rexBuilder, escapeVarchar, unreduced))
- reducedIdx += 1
- case SqlTypeName.VARBINARY | SqlTypeName.BINARY =>
- val reducedValue = reduced.getField(reducedIdx)
- val value = if (null != reducedValue) {
- new ByteString(reduced.getField(reducedIdx).asInstanceOf[Array[Byte]])
- } else {
- reducedValue
- }
- reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
- reducedIdx += 1
- case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
- val reducedValue = reduced.getField(reducedIdx)
- val value = if (reducedValue != null) {
- val dt = reducedValue.asInstanceOf[TimestampData].toLocalDateTime
- fromLocalDateTime(dt)
- } else {
- reducedValue
- }
- reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
- reducedIdx += 1
- case SqlTypeName.DECIMAL =>
- val reducedValue = reduced.getField(reducedIdx)
- val value = if (reducedValue != null) {
- reducedValue.asInstanceOf[DecimalData].toBigDecimal
- } else {
- reducedValue
- }
- reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
- reducedIdx += 1
- case SqlTypeName.TIMESTAMP =>
- val reducedValue = reduced.getField(reducedIdx)
- val value = if (reducedValue != null) {
- val dt = reducedValue.asInstanceOf[TimestampData].toLocalDateTime
- fromLocalDateTime(dt)
- } else {
- reducedValue
- }
- reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
- reducedIdx += 1
- case _ =>
- val reducedValue = reduced.getField(reducedIdx)
- // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
- val value = if (reducedValue != null &&
- unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) {
- new java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue())
- } else {
- reducedValue
- }
+ } else unreduced match {
+ case call: RexCall if call.getOperator == JSON_OBJECT =>
+ reducedValues.add(unreduced)
+ case _ =>
+ unreduced.getType.getSqlTypeName match {
+ // we insert the original expression for object literals
+ case SqlTypeName.ANY |
+ SqlTypeName.OTHER |
+ SqlTypeName.ROW |
+ SqlTypeName.STRUCTURED |
+ SqlTypeName.ARRAY |
+ SqlTypeName.MAP |
+ SqlTypeName.MULTISET =>
+ reducedValues.add(unreduced)
+ case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
+ val escapeVarchar = BinaryStringDataUtil.safeToString(
+ reduced.getField(reducedIdx).asInstanceOf[BinaryStringData])
+ reducedValues.add(maySkipNullLiteralReduce(rexBuilder, escapeVarchar, unreduced))
+ reducedIdx += 1
+ case SqlTypeName.VARBINARY | SqlTypeName.BINARY =>
+ val reducedValue = reduced.getField(reducedIdx)
+ val value = if (null != reducedValue) {
+ new ByteString(reduced.getField(reducedIdx).asInstanceOf[Array[Byte]])
+ } else {
+ reducedValue
+ }
+ reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+ reducedIdx += 1
+ case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
+ val reducedValue = reduced.getField(reducedIdx)
+ val value = if (reducedValue != null) {
+ val dt = reducedValue.asInstanceOf[TimestampData].toLocalDateTime
+ fromLocalDateTime(dt)
+ } else {
+ reducedValue
+ }
+ reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+ reducedIdx += 1
+ case SqlTypeName.DECIMAL =>
+ val reducedValue = reduced.getField(reducedIdx)
+ val value = if (reducedValue != null) {
+ reducedValue.asInstanceOf[DecimalData].toBigDecimal
+ } else {
+ reducedValue
+ }
+ reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+ reducedIdx += 1
+ case SqlTypeName.TIMESTAMP =>
+ val reducedValue = reduced.getField(reducedIdx)
+ val value = if (reducedValue != null) {
+ val dt = reducedValue.asInstanceOf[TimestampData].toLocalDateTime
+ fromLocalDateTime(dt)
+ } else {
+ reducedValue
+ }
+ reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+ reducedIdx += 1
+ case _ =>
+ val reducedValue = reduced.getField(reducedIdx)
+ // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
+ val value = if (reducedValue != null &&
+ unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) {
+ new java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue())
+ } else {
+ reducedValue
+ }
- reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
- reducedIdx += 1
- }
+ reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
+ reducedIdx += 1
+ }
}
i += 1
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
new file mode 100644
index 0000000..fde4b8e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen
+
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, rowFieldReadAccess, typeTerm}
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.JSON_OBJECT
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isCharacterString
+import org.apache.flink.table.types.logical.LogicalTypeRoot._
+import org.apache.flink.table.types.logical.{ArrayType, LogicalType, MapType, MultisetType, RowType}
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode, ContainerNode, ObjectNode}
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue
+
+import org.apache.calcite.rex.{RexCall, RexNode}
+
+import java.time.format.DateTimeFormatter
+
+/** Utility for generating JSON function calls. */
+object JsonGenerateUtils {
+
+ /**
+ * Returns a term which wraps the given <code>valueExpr</code> into a [[JsonNode]] of the
+ * appropriate type.
+ */
+ def createNodeTerm(
+ ctx: CodeGeneratorContext,
+ containerNodeTerm: String,
+ valueExpr: GeneratedExpression): String = {
+ createNodeTerm(ctx, containerNodeTerm, valueExpr.resultTerm, valueExpr.resultType)
+ }
+
+ /**
+ * Returns a term which wraps the given expression into a [[JsonNode]] of the appropriate type.
+ */
+ private def createNodeTerm(
+ ctx: CodeGeneratorContext,
+ containerNodeTerm: String,
+ term: String,
+ logicalType: LogicalType): String = {
+ logicalType.getTypeRoot match {
+ case CHAR | VARCHAR => s"$containerNodeTerm.textNode($term.toString())"
+ case BOOLEAN => s"$containerNodeTerm.booleanNode($term)"
+ case DECIMAL => s"$containerNodeTerm.numberNode($term.toBigDecimal())"
+ case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE =>
+ s"$containerNodeTerm.numberNode($term)"
+ case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
+ val formatter = s"${typeTerm(classOf[DateTimeFormatter])}.ISO_LOCAL_DATE_TIME"
+ val isoTerm = s"$term.toLocalDateTime().format($formatter)"
+ logicalType.getTypeRoot match {
+ case TIMESTAMP_WITHOUT_TIME_ZONE => s"$containerNodeTerm.textNode($isoTerm)"
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"""$containerNodeTerm.textNode($isoTerm + "Z")"""
+ }
+ case TIMESTAMP_WITH_TIME_ZONE =>
+ throw new CodeGenException(s"'TIMESTAMP WITH TIME ZONE' is not yet supported.")
+ case BINARY | VARBINARY =>
+ s"$containerNodeTerm.binaryNode($term)"
+ case ARRAY =>
+ val converterName = generateArrayConverter(ctx, containerNodeTerm,
+ logicalType.asInstanceOf[ArrayType].getElementType)
+
+ s"$converterName($term)"
+ case ROW =>
+ val converterName = generateRowConverter(ctx, containerNodeTerm,
+ logicalType.asInstanceOf[RowType])
+
+ s"$converterName($term)"
+ case MAP =>
+ val mapType = logicalType.asInstanceOf[MapType]
+ val converterName = generateMapConverter(ctx, containerNodeTerm, mapType.getKeyType,
+ mapType.getValueType)
+
+ s"$converterName($term)"
+ case MULTISET =>
+ val converterName = generateMapConverter(ctx, containerNodeTerm,
+ logicalType.asInstanceOf[MultisetType].getElementType, DataTypes.INT().getLogicalType)
+
+ s"$converterName($term)"
+ case _ => throw new CodeGenException(
+ s"Type '$logicalType' is not scalar or cannot be converted into JSON.")
+ }
+ }
+
+ /**
+ * Returns a term which wraps the given <code>valueExpr</code> as a raw [[JsonNode]].
+ *
+ * @param containerNodeTerm Name of the [[ContainerNode]] from which to create the raw node.
+ * @param valueExpr Generated expression of the value which should be wrapped.
+ * @return Generate code fragment creating the raw node.
+ */
+ def createRawNodeTerm(containerNodeTerm: String, valueExpr: GeneratedExpression): String = {
+ s"""
+ |$containerNodeTerm.rawValueNode(
+ | new ${typeTerm(classOf[RawValue])}(${valueExpr.resultTerm}.toString()))
+ |""".stripMargin
+ }
+
+ /**
+ * Determines whether the given operand is a call to a JSON function whose result should be
+ * inserted as a raw value instead of as a character string.
+ */
+ def isJsonFunctionOperand(operand: RexNode): Boolean = {
+ operand match {
+ case rexCall: RexCall => rexCall.getOperator match {
+ case JSON_OBJECT => true
+ case _ => false
+ }
+ case _ => false
+ }
+ }
+
+ /** Generates a method to convert arrays into [[ArrayNode]]. */
+ private def generateArrayConverter(
+ ctx: CodeGeneratorContext,
+ containerNodeTerm: String,
+ elementType: LogicalType): String = {
+ val fieldAccessCode = toExternalTypeTerm(
+ rowFieldReadAccess(ctx, "i", "arrData", elementType), elementType)
+
+ val methodName = newName("convertArray")
+ val methodCode =
+ s"""
+ |private ${className[ArrayNode]} $methodName(${CodeGenUtils.ARRAY_DATA} arrData) {
+ | ${className[ArrayNode]} arrNode = $containerNodeTerm.arrayNode();
+ | for (int i = 0; i < arrData.size(); i++) {
+ | arrNode.add(
+ | ${createNodeTerm(ctx, containerNodeTerm, fieldAccessCode, elementType)});
+ | }
+ |
+ | return arrNode;
+ |}
+ |""".stripMargin
+
+ ctx.addReusableMember(methodCode)
+ methodName
+ }
+
+ /** Generates a method to convert rows into [[ObjectNode]]. */
+ private def generateRowConverter(
+ ctx: CodeGeneratorContext,
+ containerNodeTerm: String,
+ rowType: RowType): String = {
+
+ val populateObjectCode = toScala(rowType.getFieldNames).zipWithIndex.map {
+ case (fieldName, idx) =>
+ val fieldType = rowType.getTypeAt(idx)
+ val fieldAccessCode = toExternalTypeTerm(
+ rowFieldReadAccess(ctx, idx.toString, "rowData", fieldType), fieldType)
+
+ s"""
+ |objNode.set("$fieldName",
+ | ${createNodeTerm(ctx, containerNodeTerm, fieldAccessCode, fieldType)});
+ |""".stripMargin
+ }.mkString
+
+ val methodName = newName("convertRow")
+ val methodCode =
+ s"""
+ |private ${className[ObjectNode]} $methodName(${CodeGenUtils.ROW_DATA} rowData) {
+ | ${className[ObjectNode]} objNode = $containerNodeTerm.objectNode();
+ | $populateObjectCode
+ |
+ | return objNode;
+ |}
+ |""".stripMargin
+
+ ctx.addReusableMember(methodCode)
+ methodName
+ }
+
+ /** Generates a method to convert maps into [[ObjectNode]]. */
+ private def generateMapConverter(
+ ctx: CodeGeneratorContext,
+ containerNodeTerm: String,
+ keyType: LogicalType,
+ valueType: LogicalType): String = {
+ if (!isCharacterString(keyType)) {
+ throw new CodeGenException(
+ s"Type '$keyType' is not supported for JSON conversion. "
+ + "The key type must be a character string.")
+ }
+
+ val keyAccessCode = toExternalTypeTerm(
+ rowFieldReadAccess(ctx, "i", "mapData.keyArray()", keyType), keyType)
+ val valueAccessCode = toExternalTypeTerm(
+ rowFieldReadAccess(ctx, "i", "mapData.valueArray()", valueType), valueType)
+
+ val methodName = newName("convertMap")
+ val methodCode =
+ s"""
+ |private ${className[ObjectNode]} $methodName(${CodeGenUtils.MAP_DATA} mapData) {
+ | ${className[ObjectNode]} objNode = $containerNodeTerm.objectNode();
+ | for (int i = 0; i < mapData.size(); i++) {
+ | java.lang.String key = $keyAccessCode;
+ | if (key == null) {
+ | throw new java.lang.IllegalArgumentException("Key at index " + i
+ | + " was null. This is not supported during conversion to JSON.");
+ | }
+ |
+ | objNode.set(key,
+ | ${createNodeTerm(ctx, containerNodeTerm, valueAccessCode, valueType)});
+ | }
+ |
+ | return objNode;
+ |}
+ |""".stripMargin
+
+ ctx.addReusableMember(methodCode)
+ methodName
+ }
+
+ private def toExternalTypeTerm(term: String, logicalType: LogicalType): String = {
+ if (isCharacterString(logicalType)) {
+ s"$term.toString()"
+ } else {
+ term
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 7b41553..82387c9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -24,8 +24,8 @@
import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
import org.apache.calcite.linq4j.tree.Types
import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions}
-import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior,
- SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
+import org.apache.calcite.sql.{SqlJsonConstructorNullClause, SqlJsonExistsErrorBehavior,
+ SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
import java.lang.reflect.Method
import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => JShort}
@@ -513,4 +513,7 @@
val JSON_QUERY = Types.lookupMethod(classOf[JsonFunctions], "jsonQuery",
classOf[String], classOf[String], classOf[SqlJsonQueryWrapperBehavior],
classOf[SqlJsonQueryEmptyOrErrorBehavior], classOf[SqlJsonQueryEmptyOrErrorBehavior])
+
+ val JSON_OBJECT = Types.lookupMethod(classOf[JsonFunctions], "jsonObject",
+ classOf[SqlJsonConstructorNullClause], classOf[Array[Any]])
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
new file mode 100644
index 0000000..ce09e7b
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import org.apache.flink.table.planner.codegen.CodeGenUtils._
+import org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createRawNodeTerm, createNodeTerm, generateArrayConverter, generateRowConverter, isJsonFunctionOperand}
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
+import org.apache.flink.table.types.logical.{ArrayType, LogicalType, RowType}
+import org.apache.flink.table.types.logical.LogicalTypeRoot.{ARRAY, MAP, MULTISET, ROW}
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode, NullNode, ObjectNode}
+
+import org.apache.calcite.rex.RexCall
+import org.apache.calcite.sql.SqlJsonConstructorNullClause
+import org.apache.calcite.sql.SqlJsonConstructorNullClause.{ABSENT_ON_NULL, NULL_ON_NULL}
+
+/**
+ * [[CallGenerator]] for [[BuiltInMethods.JSON_OBJECT]].
+ *
+ * `JSON_OBJECT` returns a character string. However, this creates an issue when nesting calls to
+ * this function with the intention of creating a nested JSON structure. Instead of a nested JSON
+ * object, a JSON string would be inserted, i.e.
+ * `JSON_OBJECT(KEY 'K' VALUE JSON_OBJECT(KEY 'A' VALUE 'B'))` would result in
+ * `{"K":"{\"A\":\"B\"}"}` instead of the intended `{"K":{"A":"B"}}`. We remedy this by treating
+ * nested calls to this function differently and inserting the value as a raw node instead of as a
+ * string node.
+ */
+class JsonObjectCallGen(call: RexCall) extends CallGenerator {
+ private def jsonUtils = className[SqlJsonUtils]
+
+ override def generate(
+ ctx: CodeGeneratorContext,
+ operands: Seq[GeneratedExpression],
+ returnType: LogicalType): GeneratedExpression = {
+
+ val nodeTerm = newName("node")
+ ctx.addReusableMember(s"${className[ObjectNode]} $nodeTerm = $jsonUtils.createObjectNode();")
+
+ val nullNodeTerm = newName("nullNode")
+ ctx.addReusableMember(s"${className[NullNode]} $nullNodeTerm = $nodeTerm.nullNode();")
+
+ val onNull = getNullBehavior(operands.head)
+ val populateNodeCode = operands.zipWithIndex.drop(1).grouped(2).map {
+ case Seq((keyExpr, _), (valueExpr, valueIdx)) =>
+ val valueTerm = if (isJsonFunctionOperand(call.operands.get(valueIdx))) {
+ createRawNodeTerm(nodeTerm, valueExpr)
+ } else {
+ createNodeTerm(ctx, nodeTerm, valueExpr)
+ }
+
+ onNull match {
+ case NULL_ON_NULL =>
+ s"""
+ |if (${valueExpr.nullTerm}) {
+ | $nodeTerm.set(${keyExpr.resultTerm}.toString(), $nullNodeTerm);
+ |} else {
+ | $nodeTerm.set(${keyExpr.resultTerm}.toString(), $valueTerm);
+ |}
+ |""".stripMargin
+ case ABSENT_ON_NULL =>
+ s"""
+ |if (!${valueExpr.nullTerm}) {
+ | $nodeTerm.set(${keyExpr.resultTerm}.toString(), $valueTerm);
+ |}
+ |""".stripMargin
+ }
+ }.mkString
+
+ val resultTerm = newName("result")
+ val resultTermType = primitiveTypeTermForType(returnType)
+ val resultCode = s"""
+ |${operands.map(_.code).mkString}
+ |
+ |$nodeTerm.removeAll();
+ |$populateNodeCode
+ |
+ |$resultTermType $resultTerm =
+ | $BINARY_STRING.fromString($jsonUtils.serializeJson($nodeTerm));
+ |""".stripMargin
+
+ GeneratedExpression(resultTerm, "false", resultCode, returnType)
+ }
+
+ private def getNullBehavior(operand: GeneratedExpression): SqlJsonConstructorNullClause = {
+ operand.literalValue match {
+ case Some(onNull: SqlJsonConstructorNullClause) => onNull
+ case _ => throw new CodeGenException(s"Expected operand to be of type"
+ + s"'${typeTerm(classOf[SqlJsonConstructorNullClause])}''")
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index d55daa6..6a7240c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -797,5 +797,4 @@
s"new String(${terms.head}, ${terms(1)}.toString())"
}
}
-
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
index 22a2ab1..742f994 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
@@ -18,22 +18,46 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonOnNull;
import org.apache.flink.table.api.JsonType;
import org.apache.flink.table.api.JsonValueOnEmptyOrError;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.types.Row;
import org.apache.commons.io.IOUtils;
import org.junit.runners.Parameterized;
import java.io.InputStream;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.jsonObject;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.api.Expressions.nullOf;
import static org.apache.flink.table.api.JsonQueryOnEmptyOrError.EMPTY_ARRAY;
@@ -50,53 +74,51 @@
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() throws Exception {
final List<TestSpec> testCases = new ArrayList<>();
- testCases.add(jsonExists());
- testCases.add(jsonValue());
- testCases.addAll(isJson());
- testCases.addAll(jsonQuery());
+ testCases.add(jsonExistsSpec());
+ testCases.add(jsonValueSpec());
+ testCases.addAll(isJsonSpec());
+ testCases.addAll(jsonQuerySpec());
+ testCases.addAll(jsonObjectSpec());
return testCases;
}
- private static TestSpec jsonExists() throws Exception {
+ private static TestSpec jsonExistsSpec() throws Exception {
final String jsonValue = getJsonFromResource("/json/json-exists.json");
return TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_EXISTS)
.onFieldsWithData(jsonValue)
- .andDataTypes(DataTypes.STRING())
+ .andDataTypes(STRING())
// NULL
.testResult(
- nullOf(DataTypes.STRING()).jsonExists("lax $"),
+ nullOf(STRING()).jsonExists("lax $"),
"JSON_EXISTS(CAST(NULL AS STRING), 'lax $')",
null,
- DataTypes.BOOLEAN())
+ BOOLEAN())
// Path variants
.testResult(
- $("f0").jsonExists("lax $"),
- "JSON_EXISTS(f0, 'lax $')",
- true,
- DataTypes.BOOLEAN())
+ $("f0").jsonExists("lax $"), "JSON_EXISTS(f0, 'lax $')", true, BOOLEAN())
.testResult(
$("f0").jsonExists("lax $.type"),
"JSON_EXISTS(f0, 'lax $.type')",
true,
- DataTypes.BOOLEAN())
+ BOOLEAN())
.testResult(
$("f0").jsonExists("lax $.author.address.city"),
"JSON_EXISTS(f0, 'lax $.author.address.city')",
true,
- DataTypes.BOOLEAN())
+ BOOLEAN())
.testResult(
$("f0").jsonExists("lax $.metadata.tags[0]"),
"JSON_EXISTS(f0, 'lax $.metadata.tags[0]')",
true,
- DataTypes.BOOLEAN())
+ BOOLEAN())
.testResult(
$("f0").jsonExists("lax $.metadata.tags[3]"),
"JSON_EXISTS(f0, 'lax $.metadata.tags[3]')",
false,
- DataTypes.BOOLEAN())
+ BOOLEAN())
// This should pass, but is broken due to
// https://issues.apache.org/jira/browse/CALCITE-4717.
// .testResult(
@@ -108,29 +130,29 @@
$("f0").jsonExists("lax $.metadata.references[0].url"),
"JSON_EXISTS(f0, 'lax $.metadata.references[0].url')",
true,
- DataTypes.BOOLEAN())
+ BOOLEAN())
.testResult(
$("f0").jsonExists("lax $.metadata.references[0].invalid"),
"JSON_EXISTS(f0, 'lax $.metadata.references[0].invalid')",
false,
- DataTypes.BOOLEAN())
+ BOOLEAN())
// ON ERROR
.testResult(
$("f0").jsonExists("strict $.invalid", JsonExistsOnError.TRUE),
"JSON_EXISTS(f0, 'strict $.invalid' TRUE ON ERROR)",
true,
- DataTypes.BOOLEAN())
+ BOOLEAN())
.testResult(
$("f0").jsonExists("strict $.invalid", JsonExistsOnError.FALSE),
"JSON_EXISTS(f0, 'strict $.invalid' FALSE ON ERROR)",
false,
- DataTypes.BOOLEAN())
+ BOOLEAN())
.testResult(
$("f0").jsonExists("strict $.invalid", JsonExistsOnError.UNKNOWN),
"JSON_EXISTS(f0, 'strict $.invalid' UNKNOWN ON ERROR)",
null,
- DataTypes.BOOLEAN())
+ BOOLEAN())
.testSqlRuntimeError(
"JSON_EXISTS(f0, 'strict $.invalid' ERROR ON ERROR)",
"No results for path: $['invalid']")
@@ -139,20 +161,20 @@
"No results for path: $['invalid']");
}
- private static TestSpec jsonValue() throws Exception {
+ private static TestSpec jsonValueSpec() throws Exception {
final String jsonValue = getJsonFromResource("/json/json-value.json");
return TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_VALUE)
.onFieldsWithData(jsonValue)
- .andDataTypes(DataTypes.STRING())
+ .andDataTypes(STRING())
// NULL and invalid types
.testResult(
- lit(null, DataTypes.STRING()).jsonValue("lax $"),
+ lit(null, STRING()).jsonValue("lax $"),
"JSON_VALUE(CAST(NULL AS STRING), 'lax $')",
null,
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
// RETURNING + Supported Data Types
@@ -160,79 +182,79 @@
$("f0").jsonValue("$.type"),
"JSON_VALUE(f0, '$.type')",
"account",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
- $("f0").jsonValue("$.activated", DataTypes.BOOLEAN()),
+ $("f0").jsonValue("$.activated", BOOLEAN()),
"JSON_VALUE(f0, '$.activated' RETURNING BOOLEAN)",
true,
- DataTypes.BOOLEAN())
+ BOOLEAN())
.testResult(
- $("f0").jsonValue("$.age", DataTypes.INT()),
+ $("f0").jsonValue("$.age", INT()),
"JSON_VALUE(f0, '$.age' RETURNING INT)",
42,
- DataTypes.INT())
+ INT())
.testResult(
- $("f0").jsonValue("$.balance", DataTypes.DOUBLE()),
+ $("f0").jsonValue("$.balance", DOUBLE()),
"JSON_VALUE(f0, '$.balance' RETURNING DOUBLE)",
13.37,
- DataTypes.DOUBLE())
+ DOUBLE())
// ON EMPTY / ON ERROR
.testResult(
$("f0").jsonValue(
"lax $.invalid",
- DataTypes.STRING(),
+ STRING(),
JsonValueOnEmptyOrError.NULL,
null,
JsonValueOnEmptyOrError.ERROR,
null),
"JSON_VALUE(f0, 'lax $.invalid' NULL ON EMPTY ERROR ON ERROR)",
null,
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonValue(
"lax $.invalid",
- DataTypes.INT(),
+ INT(),
JsonValueOnEmptyOrError.DEFAULT,
42,
JsonValueOnEmptyOrError.ERROR,
null),
"JSON_VALUE(f0, 'lax $.invalid' RETURNING INTEGER DEFAULT 42 ON EMPTY ERROR ON ERROR)",
42,
- DataTypes.INT())
+ INT())
.testResult(
$("f0").jsonValue(
"strict $.invalid",
- DataTypes.STRING(),
+ STRING(),
JsonValueOnEmptyOrError.ERROR,
null,
JsonValueOnEmptyOrError.NULL,
null),
"JSON_VALUE(f0, 'strict $.invalid' ERROR ON EMPTY NULL ON ERROR)",
null,
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonValue(
"strict $.invalid",
- DataTypes.INT(),
+ INT(),
JsonValueOnEmptyOrError.NULL,
null,
JsonValueOnEmptyOrError.DEFAULT,
42),
"JSON_VALUE(f0, 'strict $.invalid' RETURNING INTEGER NULL ON EMPTY DEFAULT 42 ON ERROR)",
42,
- DataTypes.INT());
+ INT());
}
- private static List<TestSpec> isJson() {
+ private static List<TestSpec> isJsonSpec() {
return Arrays.asList(
TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
.onFieldsWithData(1)
- .andDataTypes(DataTypes.INT())
+ .andDataTypes(INT())
.testSqlValidationError(
"f0 IS JSON",
"Cannot apply 'IS JSON VALUE' to arguments of type '<INTEGER> IS JSON VALUE'. "
@@ -242,87 +264,77 @@
String.format("Invalid function call:%nIS_JSON(INT)")),
TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
.onFieldsWithData((String) null)
- .andDataTypes(DataTypes.STRING())
- .testResult(
- $("f0").isJson(),
- "f0 IS JSON",
- false,
- DataTypes.BOOLEAN().notNull()),
+ .andDataTypes(STRING())
+ .testResult($("f0").isJson(), "f0 IS JSON", false, BOOLEAN().notNull()),
TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
.onFieldsWithData("a")
- .andDataTypes(DataTypes.STRING())
- .testResult(
- $("f0").isJson(),
- "f0 IS JSON",
- false,
- DataTypes.BOOLEAN().notNull()),
+ .andDataTypes(STRING())
+ .testResult($("f0").isJson(), "f0 IS JSON", false, BOOLEAN().notNull()),
TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
.onFieldsWithData("\"a\"")
- .andDataTypes(DataTypes.STRING())
- .testResult(
- $("f0").isJson(), "f0 IS JSON", true, DataTypes.BOOLEAN().notNull())
+ .andDataTypes(STRING())
+ .testResult($("f0").isJson(), "f0 IS JSON", true, BOOLEAN().notNull())
.testResult(
$("f0").isJson(JsonType.VALUE),
"f0 IS JSON VALUE",
true,
- DataTypes.BOOLEAN().notNull())
+ BOOLEAN().notNull())
.testResult(
$("f0").isJson(JsonType.SCALAR),
"f0 IS JSON SCALAR",
true,
- DataTypes.BOOLEAN().notNull())
+ BOOLEAN().notNull())
.testResult(
$("f0").isJson(JsonType.ARRAY),
"f0 IS JSON ARRAY",
false,
- DataTypes.BOOLEAN().notNull())
+ BOOLEAN().notNull())
.testResult(
$("f0").isJson(JsonType.OBJECT),
"f0 IS JSON OBJECT",
false,
- DataTypes.BOOLEAN().notNull()),
+ BOOLEAN().notNull()),
TestSpec.forFunction(BuiltInFunctionDefinitions.IS_JSON)
.onFieldsWithData("{}")
- .andDataTypes(DataTypes.STRING())
- .testResult(
- $("f0").isJson(), "f0 IS JSON", true, DataTypes.BOOLEAN().notNull())
+ .andDataTypes(STRING())
+ .testResult($("f0").isJson(), "f0 IS JSON", true, BOOLEAN().notNull())
.testResult(
$("f0").isJson(JsonType.VALUE),
"f0 IS JSON VALUE",
true,
- DataTypes.BOOLEAN().notNull())
+ BOOLEAN().notNull())
.testResult(
$("f0").isJson(JsonType.SCALAR),
"f0 IS JSON SCALAR",
false,
- DataTypes.BOOLEAN().notNull())
+ BOOLEAN().notNull())
.testResult(
$("f0").isJson(JsonType.ARRAY),
"f0 IS JSON ARRAY",
false,
- DataTypes.BOOLEAN().notNull())
+ BOOLEAN().notNull())
.testResult(
$("f0").isJson(JsonType.OBJECT),
"f0 IS JSON OBJECT",
true,
- DataTypes.BOOLEAN().notNull()));
+ BOOLEAN().notNull()));
}
- private static List<TestSpec> jsonQuery() throws Exception {
+ private static List<TestSpec> jsonQuerySpec() throws Exception {
final String jsonValue = getJsonFromResource("/json/json-query.json");
return Arrays.asList(
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUERY)
.onFieldsWithData((String) null)
- .andDataTypes(DataTypes.STRING())
+ .andDataTypes(STRING())
.testResult(
$("f0").jsonQuery("$"),
"JSON_QUERY(f0, '$')",
null,
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000)),
+ STRING(),
+ VARCHAR(2000)),
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUERY)
.onFieldsWithData(jsonValue)
- .andDataTypes(DataTypes.STRING())
+ .andDataTypes(STRING())
// Wrapping Behavior
@@ -330,38 +342,38 @@
$("f0").jsonQuery("$.a1", WITHOUT_ARRAY),
"JSON_QUERY(f0, '$.a1' WITHOUT WRAPPER)",
"[]",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery("$.a1", CONDITIONAL_ARRAY),
"JSON_QUERY(f0, '$.a1' WITH CONDITIONAL WRAPPER)",
"[]",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery("$.o1", CONDITIONAL_ARRAY),
"JSON_QUERY(f0, '$.o1' WITH CONDITIONAL WRAPPER)",
"[{}]",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery("$.a1", UNCONDITIONAL_ARRAY),
"JSON_QUERY(f0, '$.a1' WITH UNCONDITIONAL WRAPPER)",
"[[]]",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery("$.n1", CONDITIONAL_ARRAY),
"JSON_QUERY(f0, '$.n1' WITH CONDITIONAL WRAPPER)",
"[1]",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery("$.s1", CONDITIONAL_ARRAY),
"JSON_QUERY(f0, '$.s1' WITH CONDITIONAL WRAPPER)",
"[\"Test\"]",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
// Empty Behavior
@@ -369,20 +381,20 @@
$("f0").jsonQuery("lax $.err1", WITHOUT_ARRAY, NULL, NULL),
"JSON_QUERY(f0, 'lax $.err1' NULL ON EMPTY)",
null,
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery("lax $.err2", WITHOUT_ARRAY, EMPTY_ARRAY, NULL),
"JSON_QUERY(f0, 'lax $.err2' EMPTY ARRAY ON EMPTY)",
"[]",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery("lax $.err3", WITHOUT_ARRAY, EMPTY_OBJECT, NULL),
"JSON_QUERY(f0, 'lax $.err3' EMPTY OBJECT ON EMPTY)",
"{}",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testSqlRuntimeError(
"JSON_QUERY(f0, 'lax $.err4' ERROR ON EMPTY)",
"Empty result of JSON_QUERY function is not allowed")
@@ -396,22 +408,22 @@
$("f0").jsonQuery("strict $.err6", WITHOUT_ARRAY, NULL, NULL),
"JSON_QUERY(f0, 'strict $.err6' NULL ON ERROR)",
null,
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery(
"strict $.err7", WITHOUT_ARRAY, NULL, EMPTY_ARRAY),
"JSON_QUERY(f0, 'strict $.err7' EMPTY ARRAY ON ERROR)",
"[]",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testResult(
$("f0").jsonQuery(
"strict $.err8", WITHOUT_ARRAY, NULL, EMPTY_OBJECT),
"JSON_QUERY(f0, 'strict $.err8' EMPTY OBJECT ON ERROR)",
"{}",
- DataTypes.STRING(),
- DataTypes.VARCHAR(2000))
+ STRING(),
+ VARCHAR(2000))
.testSqlRuntimeError(
"JSON_QUERY(f0, 'strict $.err9' ERROR ON ERROR)",
"No results for path")
@@ -420,8 +432,152 @@
"No results for path"));
}
+ private static List<TestSpec> jsonObjectSpec() {
+ final Map<String, String> mapData = new HashMap<>();
+ mapData.put("M1", "V1");
+ mapData.put("M2", "V2");
+
+ final Map<String, Integer> multisetData = new HashMap<>();
+ multisetData.put("M1", 1);
+ multisetData.put("M2", 2);
+
+ return Arrays.asList(
+ TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECT)
+ .onFieldsWithData(0)
+ .testResult(
+ jsonObject(JsonOnNull.NULL),
+ "JSON_OBJECT()",
+ "{}",
+ STRING().notNull(),
+ VARCHAR(2000).notNull())
+ .testResult(
+ jsonObject(JsonOnNull.NULL, "K", nullOf(STRING())),
+ "JSON_OBJECT(KEY 'K' VALUE CAST(NULL AS STRING) NULL ON NULL)",
+ "{\"K\":null}",
+ STRING().notNull(),
+ VARCHAR(2000).notNull())
+ .testResult(
+ jsonObject(JsonOnNull.ABSENT, "K", nullOf(STRING())),
+ "JSON_OBJECT(KEY 'K' VALUE CAST(NULL AS STRING) ABSENT ON NULL)",
+ "{}",
+ STRING().notNull(),
+ VARCHAR(2000).notNull()),
+ TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECT)
+ .onFieldsWithData(
+ "V",
+ true,
+ 1,
+ 1.23d,
+ 1.23,
+ LocalDateTime.parse("1990-06-02T13:37:42.001"),
+ Instant.parse("1990-06-02T13:37:42.001Z"),
+ Arrays.asList("A1", "A2", "A3"),
+ Row.of("R1", Instant.parse("1990-06-02T13:37:42.001Z")),
+ mapData,
+ multisetData,
+ "Test".getBytes(StandardCharsets.UTF_8),
+ "Test".getBytes(StandardCharsets.UTF_8),
+ Row.of(Collections.singletonList(Row.of(1, 2))))
+ .andDataTypes(
+ STRING(),
+ BOOLEAN(),
+ INT(),
+ DOUBLE(),
+ DECIMAL(3, 2),
+ TIMESTAMP(3),
+ TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
+ ARRAY(STRING()),
+ ROW(STRING(), TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+ MAP(STRING(), STRING()),
+ MAP(STRING(), INT()),
+ BINARY(4),
+ VARBINARY(4),
+ ROW(ARRAY(ROW(INT(), INT()))))
+ .withFunction(CreateMultiset.class)
+ .testResult(
+ jsonObject(
+ JsonOnNull.NULL,
+ "K0",
+ $("f0"),
+ "K1",
+ $("f1"),
+ "K2",
+ $("f2"),
+ "K3",
+ $("f3"),
+ "K4",
+ $("f4"),
+ "K5",
+ $("f5"),
+ "K6",
+ $("f6"),
+ "K7",
+ $("f7"),
+ "K8",
+ $("f8"),
+ "K9",
+ $("f9"),
+ "K10",
+ call("CreateMultiset", $("f10")),
+ "K11",
+ $("f11"),
+ "K12",
+ $("f12"),
+ "K13",
+ $("f13"),
+ "K14",
+ jsonObject(JsonOnNull.NULL, "A", "B")),
+ "JSON_OBJECT("
+ + "'K0' VALUE f0, "
+ + "'K1' VALUE f1, "
+ + "'K2' VALUE f2, "
+ + "'K3' VALUE f3, "
+ + "'K4' VALUE f4, "
+ + "'K5' VALUE f5, "
+ + "'K6' VALUE f6, "
+ + "'K7' VALUE f7, "
+ + "'K8' VALUE f8, "
+ + "'K9' VALUE f9, "
+ + "'K10' VALUE CreateMultiset(f10), "
+ + "'K11' VALUE f11, "
+ + "'K12' VALUE f12, "
+ + "'K13' VALUE f13, "
+ + "'K14' VALUE JSON_OBJECT(KEY 'A' VALUE 'B')"
+ + ")",
+ "{"
+ + "\"K0\":\"V\","
+ + "\"K1\":true,"
+ + "\"K2\":1,"
+ + "\"K3\":1.23,"
+ + "\"K4\":1.23,"
+ + "\"K5\":\"1990-06-02T13:37:42.001\","
+ + "\"K6\":\"1990-06-02T13:37:42.001Z\","
+ + "\"K7\":[\"A1\",\"A2\",\"A3\"],"
+ + "\"K8\":{\"f0\":\"R1\",\"f1\":\"1990-06-02T13:37:42.001Z\"},"
+ + "\"K9\":{\"M1\":\"V1\",\"M2\":\"V2\"},"
+ + "\"K10\":{\"M1\":1,\"M2\":2},"
+ + "\"K11\":\"VGVzdA==\","
+ + "\"K12\":\"VGVzdA==\","
+ + "\"K13\":{\"f0\":[{\"f0\":1,\"f1\":2}]},"
+ + "\"K14\":{\"A\":\"B\"}"
+ + "}",
+ STRING().notNull(),
+ VARCHAR(2000).notNull()));
+ }
+
// ---------------------------------------------------------------------------------------------
+ /**
+ * {@link BuiltInFunctionTestBase} uses a {@code VALUES} clause, but there currently is no way
+ * to create a {@code MULTISET} for it yet. We work around this for now with a custom function.
+ */
+ public static class CreateMultiset extends ScalarFunction {
+ public @DataTypeHint("MULTISET<STRING>") Map<String, Integer> eval(
+ Map<String, Integer> map) {
+ return map;
+ }
+ }
+
private static String getJsonFromResource(String fileName) throws Exception {
final InputStream jsonResource = JsonFunctionsITCase.class.getResourceAsStream(fileName);
if (jsonResource == null) {
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
new file mode 100644
index 0000000..a7bbf4c
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Utilities for JSON functions.
+ *
+ * <p>Note that these methods are called from generated code.
+ */
+@Internal
+public class SqlJsonUtils {
+
+ private static final JsonFactory JSON_FACTORY = new JsonFactory();
+ private static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY);
+
+ /** Returns a new {@link ObjectNode}. */
+ public static ObjectNode createObjectNode() {
+ return MAPPER.createObjectNode();
+ }
+
+ /** Serializes the given {@link JsonNode} to a JSON string. */
+ public static String serializeJson(JsonNode node) {
+ try {
+ return MAPPER.writeValueAsString(node);
+ } catch (JsonProcessingException e) {
+ throw new TableException("JSON object could not be serialized: " + node.asText(), e);
+ }
+ }
+
+ private SqlJsonUtils() {}
+}