[FLINK-28916][e2e] Add e2e test for create function using jar syntax (#20545)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
index beac0f8..a574d1f 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
@@ -43,7 +43,7 @@
import java.util.List;
import java.util.Map;
-/** ITCase for adding remote jar. */
+/** End to End tests for using remote jar. */
public class UsingRemoteJarITCase extends SqlITCaseBase {
private static final Path HADOOP_CLASSPATH = TestUtils.getResource(".*hadoop.classpath");
@@ -110,6 +110,45 @@
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
}
+ @Test
+ public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception {
+ Map<String, String> replaceVars = generateReplaceVars();
+ replaceVars.put("$TEMPORARY", "TEMPORARY SYSTEM");
+ runAndCheckSQL(
+ "create_function_using_remote_jar_e2e.sql",
+ replaceVars,
+ 2,
+ Arrays.asList(
+ "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+ "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ }
+
+ @Test
+ public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
+ Map<String, String> replaceVars = generateReplaceVars();
+ replaceVars.put("$TEMPORARY", "");
+ runAndCheckSQL(
+ "create_function_using_remote_jar_e2e.sql",
+ replaceVars,
+ 2,
+ Arrays.asList(
+ "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+ "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ }
+
+ @Test
+ public void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception {
+ Map<String, String> replaceVars = generateReplaceVars();
+ replaceVars.put("$TEMPORARY", "TEMPORARY");
+ runAndCheckSQL(
+ "create_function_using_remote_jar_e2e.sql",
+ replaceVars,
+ 2,
+ Arrays.asList(
+ "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+ "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ }
+
@Override
protected Map<String, String> generateReplaceVars() {
String remoteJarPath =
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_function_using_remote_jar_e2e.sql b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_function_using_remote_jar_e2e.sql
new file mode 100644
index 0000000..30def72
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_function_using_remote_jar_e2e.sql
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE JsonTable (
+ user_name STRING,
+ order_cnt BIGINT
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = '$RESULT',
+ 'sink.rolling-policy.rollover-interval' = '2s',
+ 'sink.rolling-policy.check-interval' = '2s',
+ 'format' = 'debezium-json'
+);
+
+CREATE $TEMPORARY FUNCTION count_agg AS 'org.apache.flink.table.toolbox.CountAggFunction'
+ LANGUAGE JAVA USING JAR '$JAR_PATH';
+
+SET execution.runtime-mode = $MODE;
+SET table.exec.mini-batch.enabled = true;
+SET table.exec.mini-batch.size = 5;
+SET table.exec.mini-batch.allow-latency = 2s;
+
+INSERT INTO JsonTable
+SELECT user_name, count_agg(order_id)
+FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name)
+GROUP BY user_name;