[FLINK-35195][table] Support execute CreateMaterializedTableOperation for continuous refresh mode in SqlGateway
diff --git a/flink-table/flink-sql-gateway/pom.xml b/flink-table/flink-sql-gateway/pom.xml
index 1a50d66..61f1e75 100644
--- a/flink-table/flink-sql-gateway/pom.xml
+++ b/flink-table/flink-sql-gateway/pom.xml
@@ -127,6 +127,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-filesystem-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
new file mode 100644
index 0000000..fed6063
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
+import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
+import org.apache.flink.table.refresh.ContinuousRefreshHandler;
+import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
+import static org.apache.flink.configuration.DeploymentOptions.TARGET;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.configuration.PipelineOptions.NAME;
+import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
+
+/** Manager is responsible for execute the {@link MaterializedTableOperation}. */
+@Internal
+public class MaterializedTableManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MaterializedTableManager.class);
+
+ public static ResultFetcher callMaterializedTableOperation(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ MaterializedTableOperation op,
+ String statement) {
+ if (op instanceof CreateMaterializedTableOperation) {
+ return callCreateMaterializedTableOperation(
+ operationExecutor, handle, (CreateMaterializedTableOperation) op);
+ }
+ throw new SqlExecutionException(
+ String.format(
+ "Unsupported Operation %s for materialized table.", op.asSummaryString()));
+ }
+
+ private static ResultFetcher callCreateMaterializedTableOperation(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ CreateMaterializedTableOperation createMaterializedTableOperation) {
+ CatalogMaterializedTable materializedTable =
+ createMaterializedTableOperation.getCatalogMaterializedTable();
+ if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
+ createMaterializedInContinuousMode(
+ operationExecutor, handle, createMaterializedTableOperation);
+ } else {
+ throw new SqlExecutionException(
+ "Only support create materialized table in continuous refresh mode currently.");
+ }
+ // Just return ok for unify different refresh job info of continuous and full mode, user
+ // should get the refresh job info via desc table.
+ return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+ }
+
+ private static void createMaterializedInContinuousMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ CreateMaterializedTableOperation createMaterializedTableOperation) {
+ // create materialized table first
+ operationExecutor.callExecutableOperation(handle, createMaterializedTableOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
+ createMaterializedTableOperation.getTableIdentifier();
+ CatalogMaterializedTable catalogMaterializedTable =
+ createMaterializedTableOperation.getCatalogMaterializedTable();
+
+ // Set job name, runtime mode, checkpoint interval
+ // TODO: Set minibatch related optimization options.
+ Configuration customConfig = new Configuration();
+ String jobName =
+ String.format(
+ "Materialized_table_%s_continuous_refresh_job",
+ materializedTableIdentifier.asSerializableString());
+ customConfig.set(NAME, jobName);
+ customConfig.set(RUNTIME_MODE, STREAMING);
+ customConfig.set(CHECKPOINTING_INTERVAL, catalogMaterializedTable.getFreshness());
+
+ String insertStatement =
+ String.format(
+ "INSERT INTO %s %s",
+ materializedTableIdentifier, catalogMaterializedTable.getDefinitionQuery());
+ try {
+ // submit flink streaming job
+ ResultFetcher resultFetcher =
+ operationExecutor.executeStatement(handle, insertStatement);
+
+ // get execution.target and jobId, currently doesn't support yarn and k8s, so doesn't
+ // get clusterId
+ List<RowData> results = fetchAllResults(resultFetcher);
+ String jobId = results.get(0).getString(0).toString();
+ String executeTarget =
+ operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+ ContinuousRefreshHandler continuousRefreshHandler =
+ new ContinuousRefreshHandler(executeTarget, jobId);
+ byte[] serializedBytes =
+ ContinuousRefreshHandlerSerializer.INSTANCE.serialize(continuousRefreshHandler);
+
+ // update RefreshHandler to Catalog
+ CatalogMaterializedTable updatedMaterializedTable =
+ catalogMaterializedTable.copy(
+ CatalogMaterializedTable.RefreshStatus.ACTIVATED,
+ continuousRefreshHandler.asSummaryString(),
+ serializedBytes);
+ List<TableChange> tableChanges = new ArrayList<>();
+ tableChanges.add(
+ TableChange.modifyRefreshStatus(
+ CatalogMaterializedTable.RefreshStatus.ACTIVATED));
+ tableChanges.add(
+ TableChange.modifyRefreshHandler(
+ continuousRefreshHandler.asSummaryString(), serializedBytes));
+
+ AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
+ new AlterMaterializedTableChangeOperation(
+ materializedTableIdentifier, tableChanges, updatedMaterializedTable);
+ operationExecutor.callExecutableOperation(
+ handle, alterMaterializedTableChangeOperation);
+ } catch (Exception e) {
+ // drop materialized table while submit flink streaming job occur exception. Thus, weak
+ // atomicity is guaranteed
+ operationExecutor.callExecutableOperation(
+ handle,
+ new DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+ // log and throw exception
+ LOG.error(
+ "Submit continuous refresh job for materialized table {} occur exception.",
+ materializedTableIdentifier,
+ e);
+ throw new TableException(
+ String.format(
+ "Submit continuous refresh job for materialized table %s occur exception.",
+ materializedTableIdentifier),
+ e);
+ }
+ }
+
+ private static List<RowData> fetchAllResults(ResultFetcher resultFetcher) {
+ Long token = 0L;
+ List<RowData> results = new ArrayList<>();
+ while (token != null) {
+ ResultSet result = resultFetcher.fetchResults(token, Integer.MAX_VALUE);
+ results.addAll(result.getData());
+ token = result.getNextToken();
+ }
+ return results;
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index c50ba8c..ddd0f93 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -65,6 +65,7 @@
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.environment.SqlGatewayStreamExecutionEnvironment;
import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.materializedtable.MaterializedTableManager;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.module.ModuleManager;
@@ -96,6 +97,7 @@
import org.apache.flink.table.operations.ddl.CreateOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropOperation;
+import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.util.CollectionUtil;
@@ -193,9 +195,14 @@
}
public ResultFetcher executeStatement(OperationHandle handle, String statement) {
+ return executeStatement(handle, new Configuration(), statement);
+ }
+
+ public ResultFetcher executeStatement(
+ OperationHandle handle, Configuration customConfig, String statement) {
// Instantiate the TableEnvironment lazily
ResourceManager resourceManager = sessionContext.getSessionState().resourceManager.copy();
- TableEnvironmentInternal tableEnv = getTableEnvironment(resourceManager);
+ TableEnvironmentInternal tableEnv = getTableEnvironment(resourceManager, customConfig);
PlanCacheManager planCacheManager = sessionContext.getPlanCacheManager();
CachedPlan cachedPlan = null;
Operation op = null;
@@ -344,13 +351,16 @@
// --------------------------------------------------------------------------------------------
public TableEnvironmentInternal getTableEnvironment() {
- return getTableEnvironment(sessionContext.getSessionState().resourceManager);
+ return getTableEnvironment(
+ sessionContext.getSessionState().resourceManager, new Configuration());
}
- public TableEnvironmentInternal getTableEnvironment(ResourceManager resourceManager) {
+ public TableEnvironmentInternal getTableEnvironment(
+ ResourceManager resourceManager, Configuration customConfig) {
// checks the value of RUNTIME_MODE
Configuration operationConfig = sessionContext.getSessionConf().clone();
operationConfig.addAll(executionConfig);
+ operationConfig.addAll(customConfig);
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().withConfiguration(operationConfig).build();
@@ -492,12 +502,15 @@
|| op instanceof CreateCatalogFunctionOperation
|| op instanceof ShowFunctionsOperation) {
return callExecutableOperation(handle, (ExecutableOperation) op);
+ } else if (op instanceof MaterializedTableOperation) {
+ return MaterializedTableManager.callMaterializedTableOperation(
+ this, handle, (MaterializedTableOperation) op, statement);
} else {
return callOperation(tableEnv, handle, op);
}
}
- private ResultFetcher callExecutableOperation(OperationHandle handle, ExecutableOperation op) {
+ public ResultFetcher callExecutableOperation(OperationHandle handle, ExecutableOperation op) {
TableResultInternal result =
op.execute(
new ExecutableOperationContextImpl(
@@ -521,6 +534,10 @@
return tableConfig;
}
+ public SessionContext getSessionContext() {
+ return sessionContext;
+ }
+
private ResultFetcher callSetOperation(
TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
new file mode 100644
index 0000000..29ab697
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND;
+import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * ITCase for materialized table related statement via {@link SqlGatewayServiceImpl}. Use a separate
+ * test class rather than adding test cases to {@link SqlGatewayServiceITCase}, both because the
+ * syntax related to Materialized table is relatively independent, and to try to avoid conflicts
+ * with the code in {@link SqlGatewayServiceITCase}.
+ */
+public class MaterializedTableStatementITCase {
+
+ private static final String FILE_CATALOG_STORE = "file_store";
+ private static final String TEST_CATALOG_PREFIX = "test_catalog";
+ private static final String TEST_DEFAULT_DATABASE = "test_db";
+
+ private static final AtomicLong COUNTER = new AtomicLong(0);
+
+ @RegisterExtension
+ @Order(1)
+ static final MiniClusterExtension MINI_CLUSTER =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(2)
+ .build());
+
+ @RegisterExtension
+ @Order(2)
+ static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+ new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
+
+ @RegisterExtension
+ @Order(3)
+ static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION =
+ new TestExecutorExtension<>(
+ () ->
+ Executors.newCachedThreadPool(
+ new ExecutorThreadFactory(
+ "SqlGatewayService Test Pool",
+ IgnoreExceptionHandler.INSTANCE)));
+
+ private static SqlGatewayServiceImpl service;
+ private static SessionEnvironment defaultSessionEnvironment;
+ private static Path baseCatalogPath;
+
+ private String fileSystemCatalogPath;
+ private String fileSystemCatalogName;
+
+ @BeforeAll
+ static void setUp(@TempDir Path temporaryFolder) throws Exception {
+ service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService();
+
+ // initialize file catalog store path
+ Path fileCatalogStore = temporaryFolder.resolve(FILE_CATALOG_STORE);
+ Files.createDirectory(fileCatalogStore);
+ Map<String, String> catalogStoreOptions = new HashMap<>();
+ catalogStoreOptions.put(TABLE_CATALOG_STORE_KIND.key(), "file");
+ catalogStoreOptions.put("table.catalog-store.file.path", fileCatalogStore.toString());
+
+ // initialize test-filesystem catalog base path
+ baseCatalogPath = temporaryFolder.resolve(TEST_CATALOG_PREFIX);
+ Files.createDirectory(baseCatalogPath);
+
+ defaultSessionEnvironment =
+ SessionEnvironment.newBuilder()
+ .addSessionConfig(catalogStoreOptions)
+ .setSessionEndpointVersion(MockedEndpointVersion.V1)
+ .build();
+ }
+
+ @BeforeEach
+ void before() throws Exception {
+ String randomStr = String.valueOf(COUNTER.incrementAndGet());
+ // initialize test-filesystem catalog path with random uuid
+ Path fileCatalogPath = baseCatalogPath.resolve(randomStr);
+ Files.createDirectory(fileCatalogPath);
+ Path dbPath = fileCatalogPath.resolve(TEST_DEFAULT_DATABASE);
+ Files.createDirectory(dbPath);
+
+ fileSystemCatalogPath = fileCatalogPath.toString();
+ fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr;
+ }
+
+ @Test
+ void testCreateMaterializedTableInContinuousMode() throws Exception {
+ // initialize session handle, create test-filesystem catalog and register it to catalog
+ // store
+ SessionHandle sessionHandle = initializeSession();
+
+ String materializedTableDDL =
+ "CREATE MATERIALIZED TABLE users_shops"
+ + " PARTITIONED BY (ds)\n"
+ + " WITH(\n"
+ + " 'format' = 'debezium-json'\n"
+ + " )\n"
+ + " FRESHNESS = INTERVAL '30' SECOND\n"
+ + " AS SELECT \n"
+ + " user_id,\n"
+ + " shop_id,\n"
+ + " ds,\n"
+ + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n"
+ + " SUM (1) AS pv\n"
+ + " FROM (\n"
+ + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource"
+ + " ) AS tmp\n"
+ + " GROUP BY (user_id, shop_id, ds)";
+ OperationHandle materializedTableHandle =
+ service.executeStatement(
+ sessionHandle, materializedTableDDL, -1, new Configuration());
+ awaitOperationTermination(service, sessionHandle, materializedTableHandle);
+
+ // validate materialized table: schema, refresh mode, refresh status, refresh handler,
+ // doesn't check the data because it generates randomly.
+ ResolvedCatalogMaterializedTable actualMaterializedTable =
+ (ResolvedCatalogMaterializedTable)
+ service.getTable(
+ sessionHandle,
+ ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "users_shops"));
+
+ // Expected schema
+ ResolvedSchema expectedSchema =
+ ResolvedSchema.of(
+ Arrays.asList(
+ Column.physical("user_id", DataTypes.BIGINT()),
+ Column.physical("shop_id", DataTypes.BIGINT()),
+ Column.physical("ds", DataTypes.STRING()),
+ Column.physical("payed_buy_fee_sum", DataTypes.BIGINT()),
+ Column.physical("pv", DataTypes.INT().notNull())));
+
+ assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
+ assertThat(actualMaterializedTable.getFreshness()).isEqualTo(Duration.ofSeconds(30));
+ assertThat(actualMaterializedTable.getLogicalRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+ assertThat(actualMaterializedTable.getRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+ assertThat(actualMaterializedTable.getRefreshStatus())
+ .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+ assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
+ assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+ }
+
+ @Test
+ void testCreateMaterializedTableInFullMode() {
+ // initialize session handle, create test-filesystem catalog and register it to catalog
+ // store
+ SessionHandle sessionHandle = initializeSession();
+
+ String materializedTableDDL =
+ "CREATE MATERIALIZED TABLE users_shops"
+ + " PARTITIONED BY (ds)\n"
+ + " WITH(\n"
+ + " 'format' = 'debezium-json'\n"
+ + " )\n"
+ + " FRESHNESS = INTERVAL '1' DAY\n"
+ + " AS SELECT \n"
+ + " user_id,\n"
+ + " shop_id,\n"
+ + " ds,\n"
+ + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n"
+ + " SUM (1) AS pv\n"
+ + " FROM (\n"
+ + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource"
+ + " ) AS tmp\n"
+ + " GROUP BY (user_id, shop_id, ds)";
+ OperationHandle materializedTableHandle =
+ service.executeStatement(
+ sessionHandle, materializedTableDDL, -1, new Configuration());
+
+ assertThatThrownBy(
+ () ->
+ awaitOperationTermination(
+ service, sessionHandle, materializedTableHandle))
+ .rootCause()
+ .isInstanceOf(SqlExecutionException.class)
+ .hasMessage(
+ "Only support create materialized table in continuous refresh mode currently.");
+ }
+
+ private SessionHandle initializeSession() {
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+ String catalogDDL =
+ String.format(
+ "CREATE CATALOG %s\n"
+ + "WITH (\n"
+ + " 'type' = 'test-filesystem',\n"
+ + " 'path' = '%s',\n"
+ + " 'default-database' = '%s'\n"
+ + " )",
+ fileSystemCatalogName, fileSystemCatalogPath, TEST_DEFAULT_DATABASE);
+ service.configureSession(sessionHandle, catalogDDL, -1);
+ service.configureSession(
+ sessionHandle, String.format("USE CATALOG %s", fileSystemCatalogName), -1);
+
+ // create source table
+ String dataGenSource =
+ "CREATE TABLE datagenSource (\n"
+ + " order_id BIGINT,\n"
+ + " order_number VARCHAR(20),\n"
+ + " user_id BIGINT,\n"
+ + " shop_id BIGINT,\n"
+ + " product_id BIGINT,\n"
+ + " status BIGINT,\n"
+ + " order_type BIGINT,\n"
+ + " order_created_at TIMESTAMP,\n"
+ + " payment_amount_cents BIGINT\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '10'\n"
+ + ")";
+ service.configureSession(sessionHandle, dataGenSource, -1);
+ return sessionHandle;
+ }
+}