[FLINK-35479][e2e] Add end-to-end test for materialized table
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
index 8219669..35f7e14 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
@@ -127,7 +127,13 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
-	</dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-gateway</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
 
 	<build>
 		<plugins>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 92c9986..404b1ed 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -23,6 +23,9 @@
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
 import org.apache.flink.test.util.FileUtils;
 import org.apache.flink.test.util.JobSubmission;
 import org.apache.flink.test.util.SQLJobClientMode;
@@ -65,8 +68,11 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -463,7 +469,7 @@
     }
 
     /** This rest client is used to submit SQL strings to Rest Endpoint of Sql Gateway. */
-    private static class TestSqlGatewayRestClient {
+    public static class TestSqlGatewayRestClient {
 
         private final String host;
         private final int port;
@@ -475,12 +481,31 @@
             this.host = host;
             this.port = port;
             this.version = version;
-            this.sessionHandle = openSession();
+            this.sessionHandle = openSession(Collections.emptyMap());
         }
 
-        private String openSession() throws Exception {
-            FormBody.Builder builder = new FormBody.Builder();
-            FormBody requestBody = builder.build();
+        public TestSqlGatewayRestClient(
+                String host, int port, String version, Map<String, String> properties)
+                throws Exception {
+            this.host = host;
+            this.port = port;
+            this.version = version;
+            this.sessionHandle = openSession(properties);
+        }
+
+        private String openSession(Map<String, String> properties) throws Exception {
+            RequestBody requestBody;
+            if (properties == null || properties.isEmpty()) {
+                requestBody = new FormBody.Builder().build();
+            } else {
+                Map<String, Object> requestMap = new HashMap<>();
+                requestMap.put("properties", properties);
+                requestBody =
+                        RequestBody.create(
+                                MediaType.parse("application/json; charset=utf-8"),
+                                OBJECT_MAPPER.writeValueAsString(requestMap));
+            }
+
             final Request request =
                     new Request.Builder()
                             .post(requestBody)
@@ -529,6 +554,45 @@
             } while (!Objects.equals(status, "FINISHED") && !Objects.equals(status, "ERROR"));
         }
 
+        public List<RowData> getOperationResult(String operationHandle) throws Exception {
+            List<RowData> result = new ArrayList<>();
+            String resultUri =
+                    String.format(
+                            "/%s/sessions/%s/operations/%s/result/0",
+                            version, sessionHandle, operationHandle);
+            while (resultUri != null) {
+                final Request request =
+                        new Request.Builder()
+                                .get()
+                                .url(String.format("http://%s:%s%s", host, port, resultUri))
+                                .build();
+
+                String response = sendRequest(request);
+
+                FetchResultsResponseBody fetchResultsResponseBody =
+                        OBJECT_MAPPER.readValue(response, FetchResultsResponseBody.class);
+                ResultKind resultKind = fetchResultsResponseBody.getResultKind();
+
+                if (Objects.equals(resultKind, ResultKind.SUCCESS_WITH_CONTENT)) {
+                    result.addAll(fetchResultsResponseBody.getResults().getData());
+                }
+                resultUri = fetchResultsResponseBody.getNextResultUri();
+                Thread.sleep(1000);
+            }
+
+            return result;
+        }
+
+        public List<RowData> executeStatementWithResult(String sql) {
+            try {
+                String operationHandle = executeStatement(sql);
+                waitUntilOperationTerminate(operationHandle);
+                return getOperationResult(operationHandle);
+            } catch (Exception e) {
+                throw new RuntimeException("Execute statement failed", e);
+            }
+        }
+
         private String sendRequest(Request request) throws Exception {
             String responseString;
             try (Response response = client.newCall(request).execute()) {
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
index ef15b99..26c47a7 100644
--- a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
@@ -176,6 +176,14 @@
                                     <outputDirectory>${project.build.directory}/dependencies
                                     </outputDirectory>
                                 </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    <artifactId>flink-table-filesystem-test-utils</artifactId>
+                                    <version>${project.version}</version>
+                                    <type>jar</type>
+                                    <outputDirectory>${project.build.directory}/dependencies
+                                    </outputDirectory>
+                                </artifactItem>
                             </artifactItems>
                         </configuration>
                     </execution>
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
index dc4921b..d65ed2f 100644
--- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
@@ -20,7 +20,9 @@
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.endpoint.hive.HiveServer2Endpoint;
 import org.apache.flink.table.gateway.containers.HiveContainer;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
@@ -28,6 +30,7 @@
 import org.apache.flink.test.util.SQLJobClientMode;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkDistribution;
 import org.apache.flink.tests.util.flink.FlinkResource;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
 import org.apache.flink.tests.util.flink.GatewayController;
@@ -63,12 +66,15 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR;
 import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_PORT;
 import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions.ADDRESS;
 import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions.PORT;
+import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampMillis;
 import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles;
 import static org.junit.Assert.assertEquals;
 
@@ -77,6 +83,9 @@
 
     private static final Path HIVE_SQL_CONNECTOR_JAR =
             ResourceTestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
+    private static final Path TEST_FILESYSTEM_CONNECTOR_JAR =
+            ResourceTestUtils.getResource(
+                    ".*dependencies/flink-table-filesystem-test-utils-.*.jar");
     private static final Path HADOOP_CLASS_PATH =
             ResourceTestUtils.getResource(".*hadoop.classpath");
     private static final String GATEWAY_E2E_SQL = "gateway_e2e.sql";
@@ -90,6 +99,12 @@
     private static NetUtils.Port hiveserver2Port;
     private static NetUtils.Port restPort;
 
+    private static final String CATALOG_NAME_PREFIX = "filesystem_catalog_";
+
+    private static final String FILESYSTEM_DEFAULT_DATABASE = "test_db";
+    private static final AtomicInteger CATALOG_COUNTER = new AtomicInteger(0);
+    private static String filesystemCatalogName;
+
     @BeforeClass
     public static void beforeClass() {
         ENDPOINT_CONFIG.setString(
@@ -126,6 +141,299 @@
                         InetAddress.getByName("localhost").getHostAddress(), restPort.getPort()));
     }
 
+    @Test
+    public void testMaterializedTableInContinuousMode() throws Exception {
+        Duration continuousWaitTime = Duration.ofMinutes(5);
+        Duration continuousWaitPause = Duration.ofSeconds(10);
+
+        try (GatewayController gateway = flinkResource.startSqlGateway();
+                ClusterController ignore = flinkResource.startCluster(2)) {
+
+            FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
+                    initSessionWithCatalogStore(Collections.emptyMap());
+
+            gatewayRestClient.executeStatementWithResult(
+                    "CREATE TABLE streaming_source (\n"
+                            + "    `timestamp` TIMESTAMP(3),\n"
+                            + "    `user` VARCHAR,\n"
+                            + "    `type` VARCHAR\n"
+                            + " ) with ("
+                            + "   'format' = 'json',"
+                            + "   'source.monitor-interval' = '10s'"
+                            + ")");
+
+            gatewayRestClient.executeStatementWithResult(
+                    "insert into streaming_source select TO_TIMESTAMP('2024-06-20 00:00:00'), 'Alice', 'INFO'");
+            gatewayRestClient.executeStatementWithResult(
+                    "insert into streaming_source select TO_TIMESTAMP('2024-06-20 00:00:00'), 'Bob', 'ERROR'");
+
+            gatewayRestClient.executeStatementWithResult(
+                    " CREATE MATERIALIZED TABLE my_materialized_table_in_continuous_mode\n"
+                            + " PARTITIONED BY (ds)\n"
+                            + " with (\n"
+                            + "   'format' = 'json',\n"
+                            + "   'sink.rolling-policy.rollover-interval' = '10s',\n"
+                            + "   'sink.rolling-policy.check-interval' = '10s'\n"
+                            + "  )\n"
+                            + " FRESHNESS = INTERVAL '10' SECOND\n"
+                            + " REFRESH_MODE = CONTINUOUS\n"
+                            + " AS SELECT\n"
+                            + " DATE_FORMAT(`timestamp`, 'yyyy-MM-dd') AS ds,\n"
+                            + " user,\n"
+                            + " type\n"
+                            + " FROM streaming_source");
+
+            // set current session mode to batch for verify the materialized table
+            gatewayRestClient.executeStatementWithResult("SET 'execution.runtime-mode' = 'batch'");
+
+            // verify the result
+            CommonTestUtils.waitUtil(
+                    () -> {
+                        List<RowData> result =
+                                gatewayRestClient.executeStatementWithResult(
+                                        "select * from my_materialized_table_in_continuous_mode order by ds, user");
+                        return result.toString()
+                                .equals("[+I(2024-06-20,Alice,INFO), +I(2024-06-20,Bob,ERROR)]");
+                    },
+                    continuousWaitTime,
+                    continuousWaitPause,
+                    "Failed to wait for the result");
+
+            File savepointFolder = FOLDER.newFolder("savepoint");
+            // configure savepoint path
+            gatewayRestClient.executeStatementWithResult(
+                    String.format(
+                            "set 'execution.checkpointing.savepoint-dir'='file://%s'",
+                            savepointFolder.getAbsolutePath()));
+
+            // suspend the materialized table
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE my_materialized_table_in_continuous_mode SUSPEND");
+
+            // send more data to the source
+            gatewayRestClient.executeStatementWithResult(
+                    "insert into streaming_source select TO_TIMESTAMP('2024-06-20 00:00:00'), 'Charlie', 'WARN'");
+
+            // resume the materialized table
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE my_materialized_table_in_continuous_mode RESUME");
+
+            // verify the result
+            CommonTestUtils.waitUtil(
+                    () -> {
+                        List<RowData> result =
+                                gatewayRestClient.executeStatementWithResult(
+                                        "select * from my_materialized_table_in_continuous_mode order by ds, user");
+                        return result.toString()
+                                .equals(
+                                        "[+I(2024-06-20,Alice,INFO), +I(2024-06-20,Bob,ERROR), +I(2024-06-20,Charlie,WARN)]");
+                    },
+                    continuousWaitTime,
+                    continuousWaitPause,
+                    "Failed to wait for the result");
+
+            // drop the materialized table
+            gatewayRestClient.executeStatementWithResult(
+                    "DROP MATERIALIZED TABLE my_materialized_table_in_continuous_mode");
+        }
+    }
+
+    @Test
+    public void testMaterializedTableInFullMode() throws Exception {
+        Duration fullModeWaitTime = Duration.ofMinutes(5);
+        Duration fullModeWaitPause = Duration.ofSeconds(10);
+
+        // init session
+        try (GatewayController gateway = flinkResource.startSqlGateway();
+                ClusterController ignore = flinkResource.startCluster(2)) {
+
+            Map<String, String> sessionProperties = new HashMap<>();
+            sessionProperties.put("workflow-scheduler.type", "embedded");
+            FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
+                    initSessionWithCatalogStore(sessionProperties);
+
+            gatewayRestClient.executeStatementWithResult(
+                    "CREATE TABLE batch_source (\n"
+                            + "    `timestamp` TIMESTAMP(3),\n"
+                            + "    `user` VARCHAR,\n"
+                            + "    `type` VARCHAR\n"
+                            + " ) with ("
+                            + "   'format' = 'json'"
+                            + ")");
+
+            gatewayRestClient.executeStatementWithResult(
+                    " CREATE MATERIALIZED TABLE my_materialized_table_in_full_mode\n"
+                            + " PARTITIONED BY (ds)\n"
+                            + " WITH (\n"
+                            + "   'partition.fields.ds.date-formatter' = 'yyyy-MM-dd',\n"
+                            + "   'format' = 'json'\n"
+                            + " )\n"
+                            + " FRESHNESS = INTERVAL '1' MINUTE\n"
+                            + " REFRESH_MODE = FULL\n"
+                            + " AS SELECT\n"
+                            + " ds,\n"
+                            + " count(*) as cnt\n"
+                            + " FROM (\n"
+                            + "   SELECT\n"
+                            + "   DATE_FORMAT(`timestamp`, 'yyyy-MM-dd') AS ds,\n"
+                            + "   user,\n"
+                            + " type\n"
+                            + " FROM batch_source\n"
+                            + " ) GROUP BY ds");
+
+            long systemTime = System.currentTimeMillis();
+            String todayTimestamp =
+                    formatTimestampMillis(systemTime, "yyyy-MM-dd HH:mm:ss", TimeZone.getDefault());
+            String yesterdayTimestamp =
+                    formatTimestampMillis(
+                            systemTime - 24 * 60 * 60 * 1000,
+                            "yyyy-MM-dd HH:mm:ss",
+                            TimeZone.getDefault());
+            String tomorrowTimestamp =
+                    formatTimestampMillis(
+                            systemTime + 24 * 60 * 60 * 1000,
+                            "yyyy-MM-dd HH:mm:ss",
+                            TimeZone.getDefault());
+            String todayDateStr = todayTimestamp.substring(0, 10);
+            String yesterdayDateStr = yesterdayTimestamp.substring(0, 10);
+            String tomorrowDateStr = tomorrowTimestamp.substring(0, 10);
+
+            // Both send date to current date, yesterday and tomorrow
+            gatewayRestClient.executeStatementWithResult(
+                    String.format(
+                            "INSERT INTO batch_source VALUES "
+                                    + "(TO_TIMESTAMP('%s'), 'Alice', 'INFO'), "
+                                    + "(TO_TIMESTAMP('%s'), 'Alice', 'INFO'), "
+                                    + "(TO_TIMESTAMP('%s'), 'Alice', 'INFO')",
+                            yesterdayTimestamp, todayTimestamp, tomorrowTimestamp));
+
+            // set current session mode to batch for verify the materialized table
+            gatewayRestClient.executeStatementWithResult("SET 'execution.runtime-mode' = 'batch'");
+
+            // verify the materialized table should auto refresh the today partition or tomorrow
+            // partition
+            CommonTestUtils.waitUtil(
+                    () -> {
+                        List<RowData> result =
+                                gatewayRestClient.executeStatementWithResult(
+                                        "select * from my_materialized_table_in_full_mode order by ds");
+                        String resultStr = result.toString();
+                        return (resultStr.contains(String.format("+I(%s,1)", todayDateStr))
+                                        || resultStr.contains(
+                                                String.format("+I(%s,1)", tomorrowDateStr)))
+                                && (!resultStr.contains(
+                                        String.format("+I(%s,1)", yesterdayDateStr)));
+                    },
+                    fullModeWaitTime,
+                    fullModeWaitPause,
+                    "Failed to wait for the materialized table result");
+
+            // suspend the materialized table
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode SUSPEND");
+
+            // insert more data to the batch_source table
+            gatewayRestClient.executeStatementWithResult(
+                    String.format(
+                            "INSERT INTO batch_source VALUES "
+                                    + "(TO_TIMESTAMP('%s'), 'Bob', 'INFO'), "
+                                    + "(TO_TIMESTAMP('%s'), 'Bob', 'INFO'), "
+                                    + "(TO_TIMESTAMP('%s'), 'Bob', 'INFO')",
+                            yesterdayTimestamp, todayTimestamp, tomorrowTimestamp));
+
+            // resume the materialized table
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode RESUME");
+
+            // wait until the materialized table is updated and verify only today or tomorrow data
+            // should be updated
+            CommonTestUtils.waitUtil(
+                    () -> {
+                        List<RowData> result =
+                                gatewayRestClient.executeStatementWithResult(
+                                        "select * from my_materialized_table_in_full_mode order by ds");
+                        String resultStr = result.toString();
+                        return (resultStr.contains(String.format("+I(%s,2)", todayDateStr))
+                                        || resultStr.contains(
+                                                String.format("+I(%s,2)", tomorrowDateStr)))
+                                && (!resultStr.contains(
+                                        String.format("+I(%s,2)", yesterdayDateStr)));
+                    },
+                    fullModeWaitTime,
+                    fullModeWaitPause,
+                    "Failed to wait for the result");
+
+            // manual refresh all partitions
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode REFRESH PARTITION (ds='"
+                            + todayDateStr
+                            + "')");
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode REFRESH PARTITION (ds='"
+                            + yesterdayDateStr
+                            + "')");
+            gatewayRestClient.executeStatementWithResult(
+                    "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode REFRESH PARTITION (ds='"
+                            + tomorrowDateStr
+                            + "')");
+
+            // verify the materialized table that all partitions are updated
+            CommonTestUtils.waitUtil(
+                    () -> {
+                        List<RowData> result =
+                                gatewayRestClient.executeStatementWithResult(
+                                        "select * from my_materialized_table_in_full_mode order by ds");
+                        return result.toString()
+                                .equals(
+                                        String.format(
+                                                "[+I(%s,2), +I(%s,2), +I(%s,2)]",
+                                                yesterdayDateStr, todayDateStr, tomorrowDateStr));
+                    },
+                    fullModeWaitTime,
+                    fullModeWaitPause,
+                    "Failed to wait for the result");
+
+            // drop the materialized table
+            gatewayRestClient.executeStatementWithResult(
+                    "DROP MATERIALIZED TABLE my_materialized_table_in_full_mode");
+        }
+    }
+
+    private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore(
+            Map<String, String> extraProperties) throws Exception {
+        File catalogStoreFolder = FOLDER.newFolder();
+        Map<String, String> sessionProperties = new HashMap<>();
+        sessionProperties.put("table.catalog-store.kind", "file");
+        sessionProperties.put(
+                "table.catalog-store.file.path", catalogStoreFolder.getAbsolutePath());
+        sessionProperties.putAll(extraProperties);
+
+        FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
+                new FlinkDistribution.TestSqlGatewayRestClient(
+                        InetAddress.getByName("localhost").getHostAddress(),
+                        restPort.getPort(),
+                        SqlGatewayRestAPIVersion.getDefaultVersion().toString().toLowerCase(),
+                        sessionProperties);
+
+        filesystemCatalogName = CATALOG_NAME_PREFIX + CATALOG_COUNTER.getAndAdd(1);
+        File catalogFolder = FOLDER.newFolder(filesystemCatalogName);
+        FOLDER.newFolder(
+                String.format("%s/%s", filesystemCatalogName, FILESYSTEM_DEFAULT_DATABASE));
+        String createCatalogDDL =
+                String.format(
+                        "CREATE CATALOG %s WITH (\n"
+                                + "  'type' = 'test-filesystem',\n"
+                                + "  'default-database' = 'test_db',\n"
+                                + "  'path' = '%s'\n"
+                                + ")",
+                        filesystemCatalogName, catalogFolder.getAbsolutePath());
+        gatewayRestClient.executeStatementWithResult(createCatalogDDL);
+        gatewayRestClient.executeStatementWithResult(
+                String.format("USE CATALOG %s", filesystemCatalogName));
+
+        return gatewayRestClient;
+    }
+
     private void executeStatement(SQLJobClientMode mode) throws Exception {
         File result = FOLDER.newFolder(mode.getClass().getName() + ".csv");
         try (GatewayController gateway = flinkResource.startSqlGateway();
@@ -191,6 +499,7 @@
         FlinkResourceSetup.FlinkResourceSetupBuilder builder =
                 FlinkResourceSetup.builder()
                         .addJar(HIVE_SQL_CONNECTOR_JAR, JarLocation.LIB)
+                        .addJar(TEST_FILESYSTEM_CONNECTOR_JAR, JarLocation.LIB)
                         .moveJar("flink-table-planner", JarLocation.OPT, JarLocation.LIB)
                         .moveJar("flink-table-planner-loader", JarLocation.LIB, JarLocation.OPT);
         // add hadoop jars