[FLINK-35195][table] Convert SqlCreateMaterializedTable node to CreateMaterializedTableOperation
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
index 8a9a772..f157e50 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
@@ -89,7 +89,7 @@
}
/** Check table constraint. */
- private static void validate(SqlTableConstraint constraint) throws SqlValidateException {
+ public static void validate(SqlTableConstraint constraint) throws SqlValidateException {
if (constraint.isUnique()) {
throw new SqlValidateException(
constraint.getParserPosition(), "UNIQUE constraint is not supported yet");
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 1630a0f..eae6f1f 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -132,7 +132,6 @@
return freshness;
}
- @Nullable
public Optional<SqlLiteral> getRefreshMode() {
return Optional.ofNullable(refreshMode);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
new file mode 100644
index 0000000..d4eff00
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.ddl.CreateOperation;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Operation to describe a CREATE MATERIALIZED TABLE statement. */
+@Internal
+public class CreateMaterializedTableOperation
+ implements CreateOperation, MaterializedTableOperation {
+
+ private final ObjectIdentifier tableIdentifier;
+ private final CatalogMaterializedTable materializedTable;
+
+ public CreateMaterializedTableOperation(
+ ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable materializedTable) {
+ this.tableIdentifier = tableIdentifier;
+ this.materializedTable = materializedTable;
+ }
+
+ @Override
+ public TableResultInternal execute(Context ctx) {
+ // create materialized table in catalog
+ ctx.getCatalogManager().createTable(materializedTable, tableIdentifier, false);
+ return TableResultImpl.TABLE_RESULT_OK;
+ }
+
+ public ObjectIdentifier getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ public CatalogMaterializedTable getCatalogMaterializedTable() {
+ return materializedTable;
+ }
+
+ @Override
+ public String asSummaryString() {
+ Map<String, Object> params = new LinkedHashMap<>();
+ params.put("materializedTable", materializedTable);
+ params.put("identifier", tableIdentifier);
+
+ return OperationUtils.formatWithChildren(
+ "CREATE MATERIALIZED TABLE",
+ params,
+ Collections.emptyList(),
+ Operation::asSummaryString);
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java
new file mode 100644
index 0000000..72b83ad
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.Operation;
+
+/** The marker interface for materialized table. */
+@Internal
+public interface MaterializedTableOperation extends Operation {}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 50d894b..2958333 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.operations;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
@@ -43,6 +44,8 @@
import java.util.List;
import java.util.Objects;
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
/** An implementation of {@link SqlNodeConverter.ConvertContext}. */
public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext {
@@ -55,6 +58,11 @@
}
@Override
+ public TableConfig getTableConfig() {
+ return unwrapTableConfig(flinkPlanner.cluster());
+ }
+
+ @Override
public SqlValidator getSqlValidator() {
return flinkPlanner.getOrCreateSqlValidator();
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
new file mode 100644
index 0000000..03fc0cb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
+import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.utils.MaterializedTableUtils;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
+
+/** A converter for {@link SqlCreateMaterializedTable}. */
+public class SqlCreateMaterializedTableConverter
+ implements SqlNodeConverter<SqlCreateMaterializedTable> {
+
+ @Override
+ public Operation convertSqlNode(
+ SqlCreateMaterializedTable sqlCreateMaterializedTable, ConvertContext context) {
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(sqlCreateMaterializedTable.fullTableName());
+ ObjectIdentifier identifier =
+ context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+
+ // get comment
+ String tableComment =
+ OperationConverterUtils.getTableComment(sqlCreateMaterializedTable.getComment());
+
+ // get options
+ Map<String, String> options = new HashMap<>();
+ sqlCreateMaterializedTable
+ .getPropertyList()
+ .getList()
+ .forEach(
+ p ->
+ options.put(
+ ((SqlTableOption) p).getKeyString(),
+ ((SqlTableOption) p).getValueString()));
+
+ // get freshness
+ Duration freshness =
+ MaterializedTableUtils.getMaterializedTableFreshness(
+ sqlCreateMaterializedTable.getFreshness());
+
+ // get refresh mode
+ SqlRefreshMode sqlRefreshMode = null;
+ if (sqlCreateMaterializedTable.getRefreshMode().isPresent()) {
+ sqlRefreshMode =
+ sqlCreateMaterializedTable
+ .getRefreshMode()
+ .get()
+ .getValueAs(SqlRefreshMode.class);
+ }
+ CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode =
+ MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode);
+ // only MATERIALIZED_TABLE_FRESHNESS_THRESHOLD configured in flink conf yaml work, so we get
+ // it from rootConfiguration instead of table config
+ CatalogMaterializedTable.RefreshMode refreshMode =
+ MaterializedTableUtils.deriveRefreshMode(
+ context.getTableConfig()
+ .getRootConfiguration()
+ .get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD),
+ freshness,
+ logicalRefreshMode);
+
+ // get query schema and definition query
+ SqlNode validateQuery =
+ context.getSqlValidator().validate(sqlCreateMaterializedTable.getAsQuery());
+ PlannerQueryOperation queryOperation =
+ new PlannerQueryOperation(
+ context.toRelRoot(validateQuery).project(),
+ () -> context.toQuotedSqlString(validateQuery));
+ String definitionQuery =
+ context.expandSqlIdentifiers(queryOperation.asSerializableString());
+
+ // get schema
+ ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();
+ Schema.Builder builder = Schema.newBuilder().fromResolvedSchema(resolvedSchema);
+
+ // get and verify partition key
+ List<String> partitionKeys =
+ sqlCreateMaterializedTable.getPartitionKeyList().getList().stream()
+ .map(p -> ((SqlIdentifier) p).getSimple())
+ .collect(Collectors.toList());
+ verifyPartitioningColumnsExist(resolvedSchema, partitionKeys);
+
+ // verify and build primary key
+ sqlCreateMaterializedTable
+ .getTableConstraint()
+ .ifPresent(
+ sqlTableConstraint ->
+ verifyAndBuildPrimaryKey(
+ builder, resolvedSchema, sqlTableConstraint));
+
+ CatalogMaterializedTable materializedTable =
+ CatalogMaterializedTable.newBuilder()
+ .schema(builder.build())
+ .comment(tableComment)
+ .partitionKeys(partitionKeys)
+ .options(options)
+ .definitionQuery(definitionQuery)
+ .freshness(freshness)
+ .logicalRefreshMode(logicalRefreshMode)
+ .refreshMode(refreshMode)
+ .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+ .build();
+
+ return new CreateMaterializedTableOperation(
+ identifier,
+ context.getCatalogManager().resolveCatalogMaterializedTable(materializedTable));
+ }
+
+ private static void verifyPartitioningColumnsExist(
+ ResolvedSchema resolvedSchema, List<String> partitionKeys) {
+ for (String partitionKey : partitionKeys) {
+ if (!resolvedSchema.getColumn(partitionKey).isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "Partition column '%s' not defined in the query schema. Available columns: [%s].",
+ partitionKey,
+ resolvedSchema.getColumnNames().stream()
+ .collect(Collectors.joining("', '", "'", "'"))));
+ }
+ }
+ }
+
+ private static void verifyAndBuildPrimaryKey(
+ Schema.Builder schemaBuilder,
+ ResolvedSchema resolvedSchema,
+ SqlTableConstraint sqlTableConstraint) {
+ // check constraint type
+ try {
+ SqlConstraintValidator.validate(sqlTableConstraint);
+ } catch (SqlValidateException e) {
+ throw new ValidationException(
+ String.format("Primary key validation failed: %s.", e.getMessage()), e);
+ }
+
+ List<String> primaryKeyColumns = Arrays.asList(sqlTableConstraint.getColumnNames());
+ for (String columnName : primaryKeyColumns) {
+ Optional<Column> columnOptional = resolvedSchema.getColumn(columnName);
+ if (!columnOptional.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "Primary key column '%s' not defined in the query schema. Available columns: [%s].",
+ columnName,
+ resolvedSchema.getColumnNames().stream()
+ .collect(Collectors.joining("', '", "'", "'"))));
+ }
+
+ if (columnOptional.get().getDataType().getLogicalType().isNullable()) {
+ throw new ValidationException(
+ String.format(
+ "Could not create a PRIMARY KEY with nullable column '%s'.\n"
+ + "A PRIMARY KEY column must be declared on non-nullable physical columns.",
+ columnName));
+ }
+ }
+
+ // build primary key
+ String constraintName =
+ sqlTableConstraint
+ .getConstraintName()
+ .orElseGet(
+ () ->
+ primaryKeyColumns.stream()
+ .collect(Collectors.joining("_", "PK_", "")));
+ schemaBuilder.primaryKeyNamed(constraintName, primaryKeyColumns);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index a2b4835..cdd9d86 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.planner.operations.converters;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.utils.Expander;
@@ -72,6 +74,9 @@
/** Context of {@link SqlNodeConverter}. */
interface ConvertContext {
+ /** Returns the {@link TableConfig} defined in {@link TableEnvironment}. */
+ TableConfig getTableConfig();
+
/** Returns the {@link SqlValidator} in the convert context. */
SqlValidator getSqlValidator();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index caaafc9..fc5e3bd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -56,6 +56,7 @@
register(new SqlShowCreateCatalogConverter());
register(new SqlDescribeCatalogConverter());
register(new SqlDescribeJobConverter());
+ register(new SqlCreateMaterializedTableConverter());
}
/**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
new file mode 100644
index 0000000..33794ea
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+
+import java.time.Duration;
+
+/** The utils for materialized table. */
+@Internal
+public class MaterializedTableUtils {
+
+ public static Duration getMaterializedTableFreshness(SqlIntervalLiteral sqlIntervalLiteral) {
+ if (sqlIntervalLiteral.signum() < 0) {
+ throw new ValidationException(
+ "Materialized table freshness doesn't support negative value.");
+ }
+ if (sqlIntervalLiteral.getTypeName().getFamily() != SqlTypeFamily.INTERVAL_DAY_TIME) {
+ throw new ValidationException(
+ "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit.");
+ }
+
+ SqlIntervalLiteral.IntervalValue intervalValue =
+ sqlIntervalLiteral.getValueAs(SqlIntervalLiteral.IntervalValue.class);
+ long interval = Long.parseLong(intervalValue.getIntervalLiteral());
+ switch (intervalValue.getIntervalQualifier().typeName()) {
+ case INTERVAL_DAY:
+ return Duration.ofDays(interval);
+ case INTERVAL_HOUR:
+ return Duration.ofHours(interval);
+ case INTERVAL_MINUTE:
+ return Duration.ofMinutes(interval);
+ case INTERVAL_SECOND:
+ return Duration.ofSeconds(interval);
+ default:
+ throw new ValidationException(
+ "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit.");
+ }
+ }
+
+ public static CatalogMaterializedTable.LogicalRefreshMode deriveLogicalRefreshMode(
+ SqlRefreshMode sqlRefreshMode) {
+ if (sqlRefreshMode == null) {
+ return CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC;
+ }
+
+ switch (sqlRefreshMode) {
+ case FULL:
+ return CatalogMaterializedTable.LogicalRefreshMode.FULL;
+ case CONTINUOUS:
+ return CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS;
+ default:
+ throw new ValidationException(
+ String.format("Unsupported logical refresh mode: %s.", sqlRefreshMode));
+ }
+ }
+
+ public static CatalogMaterializedTable.RefreshMode deriveRefreshMode(
+ Duration threshold,
+ Duration definedFreshness,
+ CatalogMaterializedTable.LogicalRefreshMode definedRefreshMode) {
+ // If the refresh mode is specified manually, use it directly.
+ if (definedRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.FULL) {
+ return CatalogMaterializedTable.RefreshMode.FULL;
+ } else if (definedRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS) {
+ return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
+ }
+
+ // derive the actual refresh mode via defined freshness
+ if (definedFreshness.compareTo(threshold) <= 0) {
+ return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
+ } else {
+ return CatalogMaterializedTable.RefreshMode.FULL;
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
new file mode 100644
index 0000000..9514e12
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for the materialized table statements for {@link SqlNodeToOperationConversion}. */
+public class SqlMaterializedTableNodeToOperationConverterTest
+ extends SqlNodeToOperationConversionTestBase {
+
+ @Test
+ public void testCreateMaterializedTable() {
+ final String sql =
+ "CREATE MATERIALIZED TABLE mtbl1 (\n"
+ + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+ + ")\n"
+ + "COMMENT 'materialized table comment'\n"
+ + "PARTITIONED BY (a, d)\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem', \n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT * FROM t1";
+ Operation operation = parse(sql);
+ assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+ CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
+ CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
+ assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", "filesystem");
+ options.put("format", "json");
+ CatalogMaterializedTable expected =
+ CatalogMaterializedTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("a", DataTypes.BIGINT().notNull())
+ .column("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .column("c", DataTypes.INT())
+ .column("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .primaryKeyNamed("ct1", Collections.singletonList("a"))
+ .build())
+ .comment("materialized table comment")
+ .options(options)
+ .partitionKeys(Arrays.asList("a", "d"))
+ .freshness(Duration.ofSeconds(30))
+ .logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
+ .refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
+ .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+ .definitionQuery(
+ "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+ + "FROM `builtin`.`default`.`t1` AS `t1`")
+ .build();
+
+ assertThat(((ResolvedCatalogMaterializedTable) materializedTable).getOrigin())
+ .isEqualTo(expected);
+ }
+
+ @Test
+ public void testContinuousRefreshMode() {
+ // test continuous mode derived by specify freshness automatically
+ final String sql =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "AS SELECT * FROM t1";
+ Operation operation = parse(sql);
+ assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+ CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
+ CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
+ assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+ assertThat(materializedTable.getLogicalRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+ assertThat(materializedTable.getRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+
+ // test continuous mode by manual specify
+ final String sql2 =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "FRESHNESS = INTERVAL '30' DAY\n"
+ + "REFRESH_MODE = CONTINUOUS\n"
+ + "AS SELECT * FROM t1";
+ Operation operation2 = parse(sql2);
+ assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
+
+ CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2;
+ CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
+ assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+ assertThat(materializedTable2.getLogicalRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS);
+ assertThat(materializedTable2.getRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+ }
+
+ @Test
+ public void testFullRefreshMode() {
+ // test full mode derived by specify freshness automatically
+ final String sql =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "FRESHNESS = INTERVAL '1' DAY\n"
+ + "AS SELECT * FROM t1";
+ Operation operation = parse(sql);
+ assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+ CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
+ CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
+ assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+ assertThat(materializedTable.getLogicalRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+ assertThat(materializedTable.getRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+
+ // test full mode by manual specify
+ final String sql2 =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT * FROM t1";
+ Operation operation2 = parse(sql2);
+ assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
+
+ CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2;
+ CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
+ assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+ assertThat(materializedTable2.getLogicalRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);
+ assertThat(materializedTable2.getRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+ }
+
+ @Test
+ public void testCreateMaterializedTableWithInvalidPrimaryKey() {
+ // test unsupported constraint
+ final String sql =
+ "CREATE MATERIALIZED TABLE mtbl1 (\n"
+ + " CONSTRAINT ct1 UNIQUE(a) NOT ENFORCED"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "AS SELECT * FROM t1";
+
+ assertThatThrownBy(() -> parse(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Primary key validation failed: UNIQUE constraint is not supported yet.");
+
+ // test primary key not defined in source table
+ final String sql2 =
+ "CREATE MATERIALIZED TABLE mtbl1 (\n"
+ + " CONSTRAINT ct1 PRIMARY KEY(e) NOT ENFORCED"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "AS SELECT * FROM t1";
+
+ assertThatThrownBy(() -> parse(sql2))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Primary key column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd'].");
+
+ // test primary key with nullable source column
+ final String sql3 =
+ "CREATE MATERIALIZED TABLE mtbl1 (\n"
+ + " CONSTRAINT ct1 PRIMARY KEY(d) NOT ENFORCED"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "AS SELECT * FROM t1";
+
+ assertThatThrownBy(() -> parse(sql3))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Could not create a PRIMARY KEY with nullable column 'd'.");
+ }
+
+ @Test
+ public void testCreateMaterializedTableWithInvalidPartitionKey() {
+ final String sql =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "PARTITIONED BY (a, e)\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT * FROM t1";
+ assertThatThrownBy(() -> parse(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Partition column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd'].");
+ }
+
+ @Test
+ public void testCreateMaterializedTableWithInvalidFreshnessType() {
+ // test negative freshness value
+ final String sql =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "FRESHNESS = INTERVAL -'30' SECOND\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT * FROM t1";
+ assertThatThrownBy(() -> parse(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Materialized table freshness doesn't support negative value.");
+
+ // test unsupported freshness type
+ final String sql2 =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "FRESHNESS = INTERVAL '30' YEAR\n"
+ + "AS SELECT * FROM t1";
+ assertThatThrownBy(() -> parse(sql2))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit.");
+
+ // test unsupported freshness type
+ final String sql3 =
+ "CREATE MATERIALIZED TABLE mtbl1\n"
+ + "FRESHNESS = INTERVAL '30' DAY TO HOUR\n"
+ + "AS SELECT * FROM t1";
+ assertThatThrownBy(() -> parse(sql3))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit.");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
index 68efa48..2b87c89 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
@@ -107,7 +107,7 @@
Schema.newBuilder()
.fromResolvedSchema(
ResolvedSchema.of(
- Column.physical("a", DataTypes.BIGINT()),
+ Column.physical("a", DataTypes.BIGINT().notNull()),
Column.physical("b", DataTypes.VARCHAR(Integer.MAX_VALUE)),
Column.physical("c", DataTypes.INT()),
Column.physical("d", DataTypes.VARCHAR(Integer.MAX_VALUE))))
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index 5c31f23..8a6fc80 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -126,7 +126,7 @@
.fromFields(
new String[] {"a", "b", "c", "d"},
new AbstractDataType[] {
- DataTypes.BIGINT(),
+ DataTypes.BIGINT().notNull(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.STRING()