[FLINK-35479][e2e] Add end-to-end test for materialized table
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
index 8219669..35f7e14 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
@@ -127,7 +127,13 @@
</exclusion>
</exclusions>
</dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-gateway</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
<build>
<plugins>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 92c9986..404b1ed 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -23,6 +23,9 @@
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.test.util.FileUtils;
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.test.util.SQLJobClientMode;
@@ -65,8 +68,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -463,7 +469,7 @@
}
/** This rest client is used to submit SQL strings to Rest Endpoint of Sql Gateway. */
- private static class TestSqlGatewayRestClient {
+ public static class TestSqlGatewayRestClient {
private final String host;
private final int port;
@@ -475,12 +481,31 @@
this.host = host;
this.port = port;
this.version = version;
- this.sessionHandle = openSession();
+ this.sessionHandle = openSession(Collections.emptyMap());
}
- private String openSession() throws Exception {
- FormBody.Builder builder = new FormBody.Builder();
- FormBody requestBody = builder.build();
+ public TestSqlGatewayRestClient(
+ String host, int port, String version, Map<String, String> properties)
+ throws Exception {
+ this.host = host;
+ this.port = port;
+ this.version = version;
+ this.sessionHandle = openSession(properties);
+ }
+
+ private String openSession(Map<String, String> properties) throws Exception {
+ RequestBody requestBody;
+ if (properties == null || properties.isEmpty()) {
+ requestBody = new FormBody.Builder().build();
+ } else {
+ Map<String, Object> requestMap = new HashMap<>();
+ requestMap.put("properties", properties);
+ requestBody =
+ RequestBody.create(
+ MediaType.parse("application/json; charset=utf-8"),
+ OBJECT_MAPPER.writeValueAsString(requestMap));
+ }
+
final Request request =
new Request.Builder()
.post(requestBody)
@@ -529,6 +554,45 @@
} while (!Objects.equals(status, "FINISHED") && !Objects.equals(status, "ERROR"));
}
+ public List<RowData> getOperationResult(String operationHandle) throws Exception {
+ List<RowData> result = new ArrayList<>();
+ String resultUri =
+ String.format(
+ "/%s/sessions/%s/operations/%s/result/0",
+ version, sessionHandle, operationHandle);
+ while (resultUri != null) {
+ final Request request =
+ new Request.Builder()
+ .get()
+ .url(String.format("http://%s:%s%s", host, port, resultUri))
+ .build();
+
+ String response = sendRequest(request);
+
+ FetchResultsResponseBody fetchResultsResponseBody =
+ OBJECT_MAPPER.readValue(response, FetchResultsResponseBody.class);
+ ResultKind resultKind = fetchResultsResponseBody.getResultKind();
+
+ if (Objects.equals(resultKind, ResultKind.SUCCESS_WITH_CONTENT)) {
+ result.addAll(fetchResultsResponseBody.getResults().getData());
+ }
+ resultUri = fetchResultsResponseBody.getNextResultUri();
+ Thread.sleep(1000);
+ }
+
+ return result;
+ }
+
+ public List<RowData> executeStatementWithResult(String sql) {
+ try {
+ String operationHandle = executeStatement(sql);
+ waitUntilOperationTerminate(operationHandle);
+ return getOperationResult(operationHandle);
+ } catch (Exception e) {
+ throw new RuntimeException("Execute statement failed", e);
+ }
+ }
+
private String sendRequest(Request request) throws Exception {
String responseString;
try (Response response = client.newCall(request).execute()) {
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
index ef15b99..26c47a7 100644
--- a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
@@ -176,6 +176,14 @@
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-filesystem-test-utils</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
index dc4921b..d65ed2f 100644
--- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
@@ -20,7 +20,9 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.endpoint.hive.HiveServer2Endpoint;
import org.apache.flink.table.gateway.containers.HiveContainer;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
@@ -28,6 +30,7 @@
import org.apache.flink.test.util.SQLJobClientMode;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkDistribution;
import org.apache.flink.tests.util.flink.FlinkResource;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.GatewayController;
@@ -63,12 +66,15 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR;
import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_PORT;
import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions.ADDRESS;
import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions.PORT;
+import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampMillis;
import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles;
import static org.junit.Assert.assertEquals;
@@ -77,6 +83,9 @@
private static final Path HIVE_SQL_CONNECTOR_JAR =
ResourceTestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
+ private static final Path TEST_FILESYSTEM_CONNECTOR_JAR =
+ ResourceTestUtils.getResource(
+ ".*dependencies/flink-table-filesystem-test-utils-.*.jar");
private static final Path HADOOP_CLASS_PATH =
ResourceTestUtils.getResource(".*hadoop.classpath");
private static final String GATEWAY_E2E_SQL = "gateway_e2e.sql";
@@ -90,6 +99,12 @@
private static NetUtils.Port hiveserver2Port;
private static NetUtils.Port restPort;
+ private static final String CATALOG_NAME_PREFIX = "filesystem_catalog_";
+
+ private static final String FILESYSTEM_DEFAULT_DATABASE = "test_db";
+ private static final AtomicInteger CATALOG_COUNTER = new AtomicInteger(0);
+ private static String filesystemCatalogName;
+
@BeforeClass
public static void beforeClass() {
ENDPOINT_CONFIG.setString(
@@ -126,6 +141,299 @@
InetAddress.getByName("localhost").getHostAddress(), restPort.getPort()));
}
+ @Test
+ public void testMaterializedTableInContinuousMode() throws Exception {
+ Duration continuousWaitTime = Duration.ofMinutes(5);
+ Duration continuousWaitPause = Duration.ofSeconds(10);
+
+ try (GatewayController gateway = flinkResource.startSqlGateway();
+ ClusterController ignore = flinkResource.startCluster(2)) {
+
+ FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
+ initSessionWithCatalogStore(Collections.emptyMap());
+
+ gatewayRestClient.executeStatementWithResult(
+ "CREATE TABLE streaming_source (\n"
+ + " `timestamp` TIMESTAMP(3),\n"
+ + " `user` VARCHAR,\n"
+ + " `type` VARCHAR\n"
+ + " ) with ("
+ + " 'format' = 'json',"
+ + " 'source.monitor-interval' = '10s'"
+ + ")");
+
+ gatewayRestClient.executeStatementWithResult(
+ "insert into streaming_source select TO_TIMESTAMP('2024-06-20 00:00:00'), 'Alice', 'INFO'");
+ gatewayRestClient.executeStatementWithResult(
+ "insert into streaming_source select TO_TIMESTAMP('2024-06-20 00:00:00'), 'Bob', 'ERROR'");
+
+ gatewayRestClient.executeStatementWithResult(
+ " CREATE MATERIALIZED TABLE my_materialized_table_in_continuous_mode\n"
+ + " PARTITIONED BY (ds)\n"
+ + " with (\n"
+ + " 'format' = 'json',\n"
+ + " 'sink.rolling-policy.rollover-interval' = '10s',\n"
+ + " 'sink.rolling-policy.check-interval' = '10s'\n"
+ + " )\n"
+ + " FRESHNESS = INTERVAL '10' SECOND\n"
+ + " REFRESH_MODE = CONTINUOUS\n"
+ + " AS SELECT\n"
+ + " DATE_FORMAT(`timestamp`, 'yyyy-MM-dd') AS ds,\n"
+ + " user,\n"
+ + " type\n"
+ + " FROM streaming_source");
+
+ // set current session mode to batch for verify the materialized table
+ gatewayRestClient.executeStatementWithResult("SET 'execution.runtime-mode' = 'batch'");
+
+ // verify the result
+ CommonTestUtils.waitUtil(
+ () -> {
+ List<RowData> result =
+ gatewayRestClient.executeStatementWithResult(
+ "select * from my_materialized_table_in_continuous_mode order by ds, user");
+ return result.toString()
+ .equals("[+I(2024-06-20,Alice,INFO), +I(2024-06-20,Bob,ERROR)]");
+ },
+ continuousWaitTime,
+ continuousWaitPause,
+ "Failed to wait for the result");
+
+ File savepointFolder = FOLDER.newFolder("savepoint");
+ // configure savepoint path
+ gatewayRestClient.executeStatementWithResult(
+ String.format(
+ "set 'execution.checkpointing.savepoint-dir'='file://%s'",
+ savepointFolder.getAbsolutePath()));
+
+ // suspend the materialized table
+ gatewayRestClient.executeStatementWithResult(
+ "ALTER MATERIALIZED TABLE my_materialized_table_in_continuous_mode SUSPEND");
+
+ // send more data to the source
+ gatewayRestClient.executeStatementWithResult(
+ "insert into streaming_source select TO_TIMESTAMP('2024-06-20 00:00:00'), 'Charlie', 'WARN'");
+
+ // resume the materialized table
+ gatewayRestClient.executeStatementWithResult(
+ "ALTER MATERIALIZED TABLE my_materialized_table_in_continuous_mode RESUME");
+
+ // verify the result
+ CommonTestUtils.waitUtil(
+ () -> {
+ List<RowData> result =
+ gatewayRestClient.executeStatementWithResult(
+ "select * from my_materialized_table_in_continuous_mode order by ds, user");
+ return result.toString()
+ .equals(
+ "[+I(2024-06-20,Alice,INFO), +I(2024-06-20,Bob,ERROR), +I(2024-06-20,Charlie,WARN)]");
+ },
+ continuousWaitTime,
+ continuousWaitPause,
+ "Failed to wait for the result");
+
+ // drop the materialized table
+ gatewayRestClient.executeStatementWithResult(
+ "DROP MATERIALIZED TABLE my_materialized_table_in_continuous_mode");
+ }
+ }
+
+ @Test
+ public void testMaterializedTableInFullMode() throws Exception {
+ Duration fullModeWaitTime = Duration.ofMinutes(5);
+ Duration fullModeWaitPause = Duration.ofSeconds(10);
+
+ // init session
+ try (GatewayController gateway = flinkResource.startSqlGateway();
+ ClusterController ignore = flinkResource.startCluster(2)) {
+
+ Map<String, String> sessionProperties = new HashMap<>();
+ sessionProperties.put("workflow-scheduler.type", "embedded");
+ FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
+ initSessionWithCatalogStore(sessionProperties);
+
+ gatewayRestClient.executeStatementWithResult(
+ "CREATE TABLE batch_source (\n"
+ + " `timestamp` TIMESTAMP(3),\n"
+ + " `user` VARCHAR,\n"
+ + " `type` VARCHAR\n"
+ + " ) with ("
+ + " 'format' = 'json'"
+ + ")");
+
+ gatewayRestClient.executeStatementWithResult(
+ " CREATE MATERIALIZED TABLE my_materialized_table_in_full_mode\n"
+ + " PARTITIONED BY (ds)\n"
+ + " WITH (\n"
+ + " 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd',\n"
+ + " 'format' = 'json'\n"
+ + " )\n"
+ + " FRESHNESS = INTERVAL '1' MINUTE\n"
+ + " REFRESH_MODE = FULL\n"
+ + " AS SELECT\n"
+ + " ds,\n"
+ + " count(*) as cnt\n"
+ + " FROM (\n"
+ + " SELECT\n"
+ + " DATE_FORMAT(`timestamp`, 'yyyy-MM-dd') AS ds,\n"
+ + " user,\n"
+ + " type\n"
+ + " FROM batch_source\n"
+ + " ) GROUP BY ds");
+
+ long systemTime = System.currentTimeMillis();
+ String todayTimestamp =
+ formatTimestampMillis(systemTime, "yyyy-MM-dd HH:mm:ss", TimeZone.getDefault());
+ String yesterdayTimestamp =
+ formatTimestampMillis(
+ systemTime - 24 * 60 * 60 * 1000,
+ "yyyy-MM-dd HH:mm:ss",
+ TimeZone.getDefault());
+ String tomorrowTimestamp =
+ formatTimestampMillis(
+ systemTime + 24 * 60 * 60 * 1000,
+ "yyyy-MM-dd HH:mm:ss",
+ TimeZone.getDefault());
+ String todayDateStr = todayTimestamp.substring(0, 10);
+ String yesterdayDateStr = yesterdayTimestamp.substring(0, 10);
+ String tomorrowDateStr = tomorrowTimestamp.substring(0, 10);
+
+ // Both send date to current date, yesterday and tomorrow
+ gatewayRestClient.executeStatementWithResult(
+ String.format(
+ "INSERT INTO batch_source VALUES "
+ + "(TO_TIMESTAMP('%s'), 'Alice', 'INFO'), "
+ + "(TO_TIMESTAMP('%s'), 'Alice', 'INFO'), "
+ + "(TO_TIMESTAMP('%s'), 'Alice', 'INFO')",
+ yesterdayTimestamp, todayTimestamp, tomorrowTimestamp));
+
+ // set current session mode to batch for verify the materialized table
+ gatewayRestClient.executeStatementWithResult("SET 'execution.runtime-mode' = 'batch'");
+
+ // verify the materialized table should auto refresh the today partition or tomorrow
+ // partition
+ CommonTestUtils.waitUtil(
+ () -> {
+ List<RowData> result =
+ gatewayRestClient.executeStatementWithResult(
+ "select * from my_materialized_table_in_full_mode order by ds");
+ String resultStr = result.toString();
+ return (resultStr.contains(String.format("+I(%s,1)", todayDateStr))
+ || resultStr.contains(
+ String.format("+I(%s,1)", tomorrowDateStr)))
+ && (!resultStr.contains(
+ String.format("+I(%s,1)", yesterdayDateStr)));
+ },
+ fullModeWaitTime,
+ fullModeWaitPause,
+ "Failed to wait for the materialized table result");
+
+ // suspend the materialized table
+ gatewayRestClient.executeStatementWithResult(
+ "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode SUSPEND");
+
+ // insert more data to the batch_source table
+ gatewayRestClient.executeStatementWithResult(
+ String.format(
+ "INSERT INTO batch_source VALUES "
+ + "(TO_TIMESTAMP('%s'), 'Bob', 'INFO'), "
+ + "(TO_TIMESTAMP('%s'), 'Bob', 'INFO'), "
+ + "(TO_TIMESTAMP('%s'), 'Bob', 'INFO')",
+ yesterdayTimestamp, todayTimestamp, tomorrowTimestamp));
+
+ // resume the materialized table
+ gatewayRestClient.executeStatementWithResult(
+ "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode RESUME");
+
+ // wait until the materialized table is updated and verify only today or tomorrow data
+ // should be updated
+ CommonTestUtils.waitUtil(
+ () -> {
+ List<RowData> result =
+ gatewayRestClient.executeStatementWithResult(
+ "select * from my_materialized_table_in_full_mode order by ds");
+ String resultStr = result.toString();
+ return (resultStr.contains(String.format("+I(%s,2)", todayDateStr))
+ || resultStr.contains(
+ String.format("+I(%s,2)", tomorrowDateStr)))
+ && (!resultStr.contains(
+ String.format("+I(%s,2)", yesterdayDateStr)));
+ },
+ fullModeWaitTime,
+ fullModeWaitPause,
+ "Failed to wait for the result");
+
+ // manual refresh all partitions
+ gatewayRestClient.executeStatementWithResult(
+ "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode REFRESH PARTITION (ds='"
+ + todayDateStr
+ + "')");
+ gatewayRestClient.executeStatementWithResult(
+ "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode REFRESH PARTITION (ds='"
+ + yesterdayDateStr
+ + "')");
+ gatewayRestClient.executeStatementWithResult(
+ "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode REFRESH PARTITION (ds='"
+ + tomorrowDateStr
+ + "')");
+
+ // verify the materialized table that all partitions are updated
+ CommonTestUtils.waitUtil(
+ () -> {
+ List<RowData> result =
+ gatewayRestClient.executeStatementWithResult(
+ "select * from my_materialized_table_in_full_mode order by ds");
+ return result.toString()
+ .equals(
+ String.format(
+ "[+I(%s,2), +I(%s,2), +I(%s,2)]",
+ yesterdayDateStr, todayDateStr, tomorrowDateStr));
+ },
+ fullModeWaitTime,
+ fullModeWaitPause,
+ "Failed to wait for the result");
+
+ // drop the materialized table
+ gatewayRestClient.executeStatementWithResult(
+ "DROP MATERIALIZED TABLE my_materialized_table_in_full_mode");
+ }
+ }
+
+ private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore(
+ Map<String, String> extraProperties) throws Exception {
+ File catalogStoreFolder = FOLDER.newFolder();
+ Map<String, String> sessionProperties = new HashMap<>();
+ sessionProperties.put("table.catalog-store.kind", "file");
+ sessionProperties.put(
+ "table.catalog-store.file.path", catalogStoreFolder.getAbsolutePath());
+ sessionProperties.putAll(extraProperties);
+
+ FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
+ new FlinkDistribution.TestSqlGatewayRestClient(
+ InetAddress.getByName("localhost").getHostAddress(),
+ restPort.getPort(),
+ SqlGatewayRestAPIVersion.getDefaultVersion().toString().toLowerCase(),
+ sessionProperties);
+
+ filesystemCatalogName = CATALOG_NAME_PREFIX + CATALOG_COUNTER.getAndAdd(1);
+ File catalogFolder = FOLDER.newFolder(filesystemCatalogName);
+ FOLDER.newFolder(
+ String.format("%s/%s", filesystemCatalogName, FILESYSTEM_DEFAULT_DATABASE));
+ String createCatalogDDL =
+ String.format(
+ "CREATE CATALOG %s WITH (\n"
+ + " 'type' = 'test-filesystem',\n"
+ + " 'default-database' = 'test_db',\n"
+ + " 'path' = '%s'\n"
+ + ")",
+ filesystemCatalogName, catalogFolder.getAbsolutePath());
+ gatewayRestClient.executeStatementWithResult(createCatalogDDL);
+ gatewayRestClient.executeStatementWithResult(
+ String.format("USE CATALOG %s", filesystemCatalogName));
+
+ return gatewayRestClient;
+ }
+
private void executeStatement(SQLJobClientMode mode) throws Exception {
File result = FOLDER.newFolder(mode.getClass().getName() + ".csv");
try (GatewayController gateway = flinkResource.startSqlGateway();
@@ -191,6 +499,7 @@
FlinkResourceSetup.FlinkResourceSetupBuilder builder =
FlinkResourceSetup.builder()
.addJar(HIVE_SQL_CONNECTOR_JAR, JarLocation.LIB)
+ .addJar(TEST_FILESYSTEM_CONNECTOR_JAR, JarLocation.LIB)
.moveJar("flink-table-planner", JarLocation.OPT, JarLocation.LIB)
.moveJar("flink-table-planner-loader", JarLocation.LIB, JarLocation.OPT);
// add hadoop jars