[FLINK-35195][table] Convert SqlCreateMaterializedTable node to CreateMaterializedTableOperation
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
index 8a9a772..f157e50 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
@@ -89,7 +89,7 @@
     }
 
     /** Check table constraint. */
-    private static void validate(SqlTableConstraint constraint) throws SqlValidateException {
+    public static void validate(SqlTableConstraint constraint) throws SqlValidateException {
         if (constraint.isUnique()) {
             throw new SqlValidateException(
                     constraint.getParserPosition(), "UNIQUE constraint is not supported yet");
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 1630a0f..eae6f1f 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -132,7 +132,6 @@
         return freshness;
     }
 
-    @Nullable
     public Optional<SqlLiteral> getRefreshMode() {
         return Optional.ofNullable(refreshMode);
     }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
new file mode 100644
index 0000000..d4eff00
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.ddl.CreateOperation;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Operation to describe a CREATE MATERIALIZED TABLE statement. */
+@Internal
+public class CreateMaterializedTableOperation
+        implements CreateOperation, MaterializedTableOperation {
+
+    private final ObjectIdentifier tableIdentifier;
+    private final CatalogMaterializedTable materializedTable;
+
+    public CreateMaterializedTableOperation(
+            ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable materializedTable) {
+        this.tableIdentifier = tableIdentifier;
+        this.materializedTable = materializedTable;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        // create materialized table in catalog
+        ctx.getCatalogManager().createTable(materializedTable, tableIdentifier, false);
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    public ObjectIdentifier getTableIdentifier() {
+        return tableIdentifier;
+    }
+
+    public CatalogMaterializedTable getCatalogMaterializedTable() {
+        return materializedTable;
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("materializedTable", materializedTable);
+        params.put("identifier", tableIdentifier);
+
+        return OperationUtils.formatWithChildren(
+                "CREATE MATERIALIZED TABLE",
+                params,
+                Collections.emptyList(),
+                Operation::asSummaryString);
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java
new file mode 100644
index 0000000..72b83ad
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.Operation;
+
+/** The marker interface for materialized table. */
+@Internal
+public interface MaterializedTableOperation extends Operation {}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 50d894b..2958333 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.SqlToRexConverter;
@@ -43,6 +44,8 @@
 import java.util.List;
 import java.util.Objects;
 
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
 /** An implementation of {@link SqlNodeConverter.ConvertContext}. */
 public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext {
 
@@ -55,6 +58,11 @@
     }
 
     @Override
+    public TableConfig getTableConfig() {
+        return unwrapTableConfig(flinkPlanner.cluster());
+    }
+
+    @Override
     public SqlValidator getSqlValidator() {
         return flinkPlanner.getOrCreateSqlValidator();
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
new file mode 100644
index 0000000..03fc0cb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
+import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.utils.MaterializedTableUtils;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
+
+/** A converter for {@link SqlCreateMaterializedTable}. */
+public class SqlCreateMaterializedTableConverter
+        implements SqlNodeConverter<SqlCreateMaterializedTable> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlCreateMaterializedTable sqlCreateMaterializedTable, ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateMaterializedTable.fullTableName());
+        ObjectIdentifier identifier =
+                context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+
+        // get comment
+        String tableComment =
+                OperationConverterUtils.getTableComment(sqlCreateMaterializedTable.getComment());
+
+        // get options
+        Map<String, String> options = new HashMap<>();
+        sqlCreateMaterializedTable
+                .getPropertyList()
+                .getList()
+                .forEach(
+                        p ->
+                                options.put(
+                                        ((SqlTableOption) p).getKeyString(),
+                                        ((SqlTableOption) p).getValueString()));
+
+        // get freshness
+        Duration freshness =
+                MaterializedTableUtils.getMaterializedTableFreshness(
+                        sqlCreateMaterializedTable.getFreshness());
+
+        // get refresh mode
+        SqlRefreshMode sqlRefreshMode = null;
+        if (sqlCreateMaterializedTable.getRefreshMode().isPresent()) {
+            sqlRefreshMode =
+                    sqlCreateMaterializedTable
+                            .getRefreshMode()
+                            .get()
+                            .getValueAs(SqlRefreshMode.class);
+        }
+        CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode =
+                MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode);
+        // only MATERIALIZED_TABLE_FRESHNESS_THRESHOLD configured in flink conf yaml work, so we get
+        // it from rootConfiguration instead of table config
+        CatalogMaterializedTable.RefreshMode refreshMode =
+                MaterializedTableUtils.deriveRefreshMode(
+                        context.getTableConfig()
+                                .getRootConfiguration()
+                                .get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD),
+                        freshness,
+                        logicalRefreshMode);
+
+        // get query schema and definition query
+        SqlNode validateQuery =
+                context.getSqlValidator().validate(sqlCreateMaterializedTable.getAsQuery());
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validateQuery).project(),
+                        () -> context.toQuotedSqlString(validateQuery));
+        String definitionQuery =
+                context.expandSqlIdentifiers(queryOperation.asSerializableString());
+
+        // get schema
+        ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();
+        Schema.Builder builder = Schema.newBuilder().fromResolvedSchema(resolvedSchema);
+
+        // get and verify partition key
+        List<String> partitionKeys =
+                sqlCreateMaterializedTable.getPartitionKeyList().getList().stream()
+                        .map(p -> ((SqlIdentifier) p).getSimple())
+                        .collect(Collectors.toList());
+        verifyPartitioningColumnsExist(resolvedSchema, partitionKeys);
+
+        // verify and build primary key
+        sqlCreateMaterializedTable
+                .getTableConstraint()
+                .ifPresent(
+                        sqlTableConstraint ->
+                                verifyAndBuildPrimaryKey(
+                                        builder, resolvedSchema, sqlTableConstraint));
+
+        CatalogMaterializedTable materializedTable =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(builder.build())
+                        .comment(tableComment)
+                        .partitionKeys(partitionKeys)
+                        .options(options)
+                        .definitionQuery(definitionQuery)
+                        .freshness(freshness)
+                        .logicalRefreshMode(logicalRefreshMode)
+                        .refreshMode(refreshMode)
+                        .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .build();
+
+        return new CreateMaterializedTableOperation(
+                identifier,
+                context.getCatalogManager().resolveCatalogMaterializedTable(materializedTable));
+    }
+
+    private static void verifyPartitioningColumnsExist(
+            ResolvedSchema resolvedSchema, List<String> partitionKeys) {
+        for (String partitionKey : partitionKeys) {
+            if (!resolvedSchema.getColumn(partitionKey).isPresent()) {
+                throw new ValidationException(
+                        String.format(
+                                "Partition column '%s' not defined in the query schema. Available columns: [%s].",
+                                partitionKey,
+                                resolvedSchema.getColumnNames().stream()
+                                        .collect(Collectors.joining("', '", "'", "'"))));
+            }
+        }
+    }
+
+    private static void verifyAndBuildPrimaryKey(
+            Schema.Builder schemaBuilder,
+            ResolvedSchema resolvedSchema,
+            SqlTableConstraint sqlTableConstraint) {
+        // check constraint type
+        try {
+            SqlConstraintValidator.validate(sqlTableConstraint);
+        } catch (SqlValidateException e) {
+            throw new ValidationException(
+                    String.format("Primary key validation failed: %s.", e.getMessage()), e);
+        }
+
+        List<String> primaryKeyColumns = Arrays.asList(sqlTableConstraint.getColumnNames());
+        for (String columnName : primaryKeyColumns) {
+            Optional<Column> columnOptional = resolvedSchema.getColumn(columnName);
+            if (!columnOptional.isPresent()) {
+                throw new ValidationException(
+                        String.format(
+                                "Primary key column '%s' not defined in the query schema. Available columns: [%s].",
+                                columnName,
+                                resolvedSchema.getColumnNames().stream()
+                                        .collect(Collectors.joining("', '", "'", "'"))));
+            }
+
+            if (columnOptional.get().getDataType().getLogicalType().isNullable()) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not create a PRIMARY KEY with nullable column '%s'.\n"
+                                        + "A PRIMARY KEY column must be declared on non-nullable physical columns.",
+                                columnName));
+            }
+        }
+
+        // build primary key
+        String constraintName =
+                sqlTableConstraint
+                        .getConstraintName()
+                        .orElseGet(
+                                () ->
+                                        primaryKeyColumns.stream()
+                                                .collect(Collectors.joining("_", "PK_", "")));
+        schemaBuilder.primaryKeyNamed(constraintName, primaryKeyColumns);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index a2b4835..cdd9d86 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.operations.converters;
 
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.planner.utils.Expander;
@@ -72,6 +74,9 @@
     /** Context of {@link SqlNodeConverter}. */
     interface ConvertContext {
 
+        /** Returns the {@link TableConfig} defined in {@link TableEnvironment}. */
+        TableConfig getTableConfig();
+
         /** Returns the {@link SqlValidator} in the convert context. */
         SqlValidator getSqlValidator();
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index caaafc9..fc5e3bd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -56,6 +56,7 @@
         register(new SqlShowCreateCatalogConverter());
         register(new SqlDescribeCatalogConverter());
         register(new SqlDescribeJobConverter());
+        register(new SqlCreateMaterializedTableConverter());
     }
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
new file mode 100644
index 0000000..33794ea
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+
+import java.time.Duration;
+
+/** The utils for materialized table. */
+@Internal
+public class MaterializedTableUtils {
+
+    public static Duration getMaterializedTableFreshness(SqlIntervalLiteral sqlIntervalLiteral) {
+        if (sqlIntervalLiteral.signum() < 0) {
+            throw new ValidationException(
+                    "Materialized table freshness doesn't support negative value.");
+        }
+        if (sqlIntervalLiteral.getTypeName().getFamily() != SqlTypeFamily.INTERVAL_DAY_TIME) {
+            throw new ValidationException(
+                    "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit.");
+        }
+
+        SqlIntervalLiteral.IntervalValue intervalValue =
+                sqlIntervalLiteral.getValueAs(SqlIntervalLiteral.IntervalValue.class);
+        long interval = Long.parseLong(intervalValue.getIntervalLiteral());
+        switch (intervalValue.getIntervalQualifier().typeName()) {
+            case INTERVAL_DAY:
+                return Duration.ofDays(interval);
+            case INTERVAL_HOUR:
+                return Duration.ofHours(interval);
+            case INTERVAL_MINUTE:
+                return Duration.ofMinutes(interval);
+            case INTERVAL_SECOND:
+                return Duration.ofSeconds(interval);
+            default:
+                throw new ValidationException(
+                        "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit.");
+        }
+    }
+
+    public static CatalogMaterializedTable.LogicalRefreshMode deriveLogicalRefreshMode(
+            SqlRefreshMode sqlRefreshMode) {
+        if (sqlRefreshMode == null) {
+            return CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC;
+        }
+
+        switch (sqlRefreshMode) {
+            case FULL:
+                return CatalogMaterializedTable.LogicalRefreshMode.FULL;
+            case CONTINUOUS:
+                return CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS;
+            default:
+                throw new ValidationException(
+                        String.format("Unsupported logical refresh mode: %s.", sqlRefreshMode));
+        }
+    }
+
+    public static CatalogMaterializedTable.RefreshMode deriveRefreshMode(
+            Duration threshold,
+            Duration definedFreshness,
+            CatalogMaterializedTable.LogicalRefreshMode definedRefreshMode) {
+        // If the refresh mode is specified manually, use it directly.
+        if (definedRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.FULL) {
+            return CatalogMaterializedTable.RefreshMode.FULL;
+        } else if (definedRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS) {
+            return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
+        }
+
+        // derive the actual refresh mode via defined freshness
+        if (definedFreshness.compareTo(threshold) <= 0) {
+            return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
+        } else {
+            return CatalogMaterializedTable.RefreshMode.FULL;
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
new file mode 100644
index 0000000..9514e12
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for the materialized table statements for {@link SqlNodeToOperationConversion}. */
+public class SqlMaterializedTableNodeToOperationConverterTest
+        extends SqlNodeToOperationConversionTestBase {
+
+    @Test
+    public void testCreateMaterializedTable() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
+        CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
+        assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "filesystem");
+        options.put("format", "json");
+        CatalogMaterializedTable expected =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("a", DataTypes.BIGINT().notNull())
+                                        .column("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .column("c", DataTypes.INT())
+                                        .column("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .primaryKeyNamed("ct1", Collections.singletonList("a"))
+                                        .build())
+                        .comment("materialized table comment")
+                        .options(options)
+                        .partitionKeys(Arrays.asList("a", "d"))
+                        .freshness(Duration.ofSeconds(30))
+                        .logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
+                        .refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
+                        .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .definitionQuery(
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS `t1`")
+                        .build();
+
+        assertThat(((ResolvedCatalogMaterializedTable) materializedTable).getOrigin())
+                .isEqualTo(expected);
+    }
+
+    @Test
+    public void testContinuousRefreshMode() {
+        // test continuous mode derived by specify freshness automatically
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
+        CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
+        assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        assertThat(materializedTable.getLogicalRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+        assertThat(materializedTable.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+
+        // test continuous mode by manual specify
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' DAY\n"
+                        + "REFRESH_MODE = CONTINUOUS\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation2 = parse(sql2);
+        assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2;
+        CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
+        assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        assertThat(materializedTable2.getLogicalRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS);
+        assertThat(materializedTable2.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+    }
+
+    @Test
+    public void testFullRefreshMode() {
+        // test full mode derived by specify freshness automatically
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '1' DAY\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
+        CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
+        assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        assertThat(materializedTable.getLogicalRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+        assertThat(materializedTable.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+
+        // test full mode by manual specify
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation2 = parse(sql2);
+        assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2;
+        CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
+        assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        assertThat(materializedTable2.getLogicalRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);
+        assertThat(materializedTable2.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+    }
+
+    @Test
+    public void testCreateMaterializedTableWithInvalidPrimaryKey() {
+        // test unsupported constraint
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 UNIQUE(a) NOT ENFORCED"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "AS SELECT * FROM t1";
+
+        assertThatThrownBy(() -> parse(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Primary key validation failed: UNIQUE constraint is not supported yet.");
+
+        // test primary key not defined in source table
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(e) NOT ENFORCED"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "AS SELECT * FROM t1";
+
+        assertThatThrownBy(() -> parse(sql2))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Primary key column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd'].");
+
+        // test primary key with nullable source column
+        final String sql3 =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(d) NOT ENFORCED"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "AS SELECT * FROM t1";
+
+        assertThatThrownBy(() -> parse(sql3))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Could not create a PRIMARY KEY with nullable column 'd'.");
+    }
+
+    @Test
+    public void testCreateMaterializedTableWithInvalidPartitionKey() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "PARTITIONED BY (a, e)\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Partition column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd'].");
+    }
+
+    @Test
+    public void testCreateMaterializedTableWithInvalidFreshnessType() {
+        // test negative freshness value
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL -'30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Materialized table freshness doesn't support negative value.");
+
+        // test unsupported freshness type
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' YEAR\n"
+                        + "AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql2))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit.");
+
+        // test unsupported freshness type
+        final String sql3 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' DAY TO HOUR\n"
+                        + "AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql3))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit.");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
index 68efa48..2b87c89 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
@@ -107,7 +107,7 @@
                 Schema.newBuilder()
                         .fromResolvedSchema(
                                 ResolvedSchema.of(
-                                        Column.physical("a", DataTypes.BIGINT()),
+                                        Column.physical("a", DataTypes.BIGINT().notNull()),
                                         Column.physical("b", DataTypes.VARCHAR(Integer.MAX_VALUE)),
                                         Column.physical("c", DataTypes.INT()),
                                         Column.physical("d", DataTypes.VARCHAR(Integer.MAX_VALUE))))
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index 5c31f23..8a6fc80 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -126,7 +126,7 @@
                 .fromFields(
                         new String[] {"a", "b", "c", "d"},
                         new AbstractDataType[] {
-                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT().notNull(),
                             DataTypes.STRING(),
                             DataTypes.INT(),
                             DataTypes.STRING()