[FLINK-28916][e2e] Add e2e test for create function using jar syntax (#20545)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
index beac0f8..a574d1f 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
@@ -43,7 +43,7 @@
 import java.util.List;
 import java.util.Map;
 
-/** ITCase for adding remote jar. */
+/** End to End tests for using remote jar. */
 public class UsingRemoteJarITCase extends SqlITCaseBase {
     private static final Path HADOOP_CLASSPATH = TestUtils.getResource(".*hadoop.classpath");
 
@@ -110,6 +110,45 @@
                         "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
     }
 
+    @Test
+    public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception {
+        Map<String, String> replaceVars = generateReplaceVars();
+        replaceVars.put("$TEMPORARY", "TEMPORARY SYSTEM");
+        runAndCheckSQL(
+                "create_function_using_remote_jar_e2e.sql",
+                replaceVars,
+                2,
+                Arrays.asList(
+                        "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+    }
+
+    @Test
+    public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
+        Map<String, String> replaceVars = generateReplaceVars();
+        replaceVars.put("$TEMPORARY", "");
+        runAndCheckSQL(
+                "create_function_using_remote_jar_e2e.sql",
+                replaceVars,
+                2,
+                Arrays.asList(
+                        "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+    }
+
+    @Test
+    public void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception {
+        Map<String, String> replaceVars = generateReplaceVars();
+        replaceVars.put("$TEMPORARY", "TEMPORARY");
+        runAndCheckSQL(
+                "create_function_using_remote_jar_e2e.sql",
+                replaceVars,
+                2,
+                Arrays.asList(
+                        "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+                        "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+    }
+
     @Override
     protected Map<String, String> generateReplaceVars() {
         String remoteJarPath =
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_function_using_remote_jar_e2e.sql b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_function_using_remote_jar_e2e.sql
new file mode 100644
index 0000000..30def72
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_function_using_remote_jar_e2e.sql
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+CREATE TABLE JsonTable (
+   user_name STRING,
+   order_cnt BIGINT
+) WITH (
+    'connector' = 'filesystem',
+    'path' = '$RESULT',
+    'sink.rolling-policy.rollover-interval' = '2s',
+    'sink.rolling-policy.check-interval' = '2s',
+    'format' = 'debezium-json'
+);
+
+CREATE $TEMPORARY FUNCTION count_agg AS 'org.apache.flink.table.toolbox.CountAggFunction'
+    LANGUAGE JAVA USING JAR '$JAR_PATH';
+
+SET execution.runtime-mode = $MODE;
+SET table.exec.mini-batch.enabled = true;
+SET table.exec.mini-batch.size = 5;
+SET table.exec.mini-batch.allow-latency = 2s;
+
+INSERT INTO JsonTable
+SELECT user_name, count_agg(order_id)
+FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name)
+GROUP BY user_name;