[issues-32] Implement of Apache RocketMQ Flink Catalog. (#49)

Co-authored-by: lixiaoshuang <644968328@qq.com>

Co-authored-by: gongzhongqiang <764629910@qq.com>
diff --git a/README.md b/README.md
index 9ecf41b..fa53dd7 100644
--- a/README.md
+++ b/README.md
@@ -145,7 +145,7 @@
 ) WITH (
   'connector' = 'rocketmq',
   'topic' = 'user_behavior',
-  'consumeGroup' = 'behavior_consume_group',
+  'consumerGroup' = 'behavior_consumer_group',
   'nameServerAddress' = '127.0.0.1:9876'
 );
 
@@ -183,7 +183,7 @@
 ) WITH (
   'connector' = 'rocketmq',
   'topic' = 'user_behavior',
-  'consumeGroup' = 'behavior_consume_group',
+  'consumerGroup' = 'behavior_consumer_group',
   'nameServerAddress' = '127.0.0.1:9876'
 );
 ```
diff --git a/pom.xml b/pom.xml
index 72268c9..1b725e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,8 @@
         <flink.version>1.15.0</flink.version>
         <commons-lang.version>2.6</commons-lang.version>
         <spotless.version>2.4.2</spotless.version>
-	    <jaxb-api.version>2.3.1</jaxb-api.version>
+        <jaxb-api.version>2.3.1</jaxb-api.version>
+        <rocketmq.schema.registry.version>0.0.4-SNAPSHOT</rocketmq.schema.registry.version>
     </properties>
 
     <dependencies>
@@ -84,6 +85,11 @@
             <version>${flink.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-avro</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
             <version>${rocketmq.version}</version>
@@ -106,10 +112,25 @@
         </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-namesrv</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-test</artifactId>
             <version>${rocketmq.version}</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>schema-registry-client</artifactId>
+            <version>${rocketmq.schema.registry.version}</version>
+        </dependency>
+        
         <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
@@ -135,18 +156,7 @@
             <version>1.5.5</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-namesrv</artifactId>
-            <version>${rocketmq.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-broker</artifactId>
-            <version>${rocketmq.version}</version>
-            <scope>test</scope>
-        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java
new file mode 100644
index 0000000..3636995
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.flink.common.constant.RocketMqCatalogConstant;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** A catalog implementation for RocketMQ. */
+public class RocketMQCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQCatalog.class);
+    public static final String DEFAULT_DB = "default";
+    public final String namesrvAddr;
+    private final String schemaRegistryUrl;
+    private DefaultMQAdminExt mqAdminExt;
+    private SchemaRegistryClient schemaRegistryClient;
+
+    public RocketMQCatalog(
+            String catalogName, String database, String namesrvAddr, String schemaRegistryUrl) {
+        super(catalogName, database);
+
+        checkArgument(!isNullOrWhitespaceOnly(namesrvAddr), "namesrvAddr cannot be null or empty");
+        checkArgument(
+                !isNullOrWhitespaceOnly(schemaRegistryUrl),
+                "schemaRegistryUrl cannot be null or empty");
+
+        this.namesrvAddr = namesrvAddr;
+        this.schemaRegistryUrl = schemaRegistryUrl;
+        LOG.info("Created RocketMQ Catalog {}", catalogName);
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(new RocketMQCatalogFactory());
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        if (mqAdminExt == null) {
+            try {
+                mqAdminExt = new DefaultMQAdminExt();
+                mqAdminExt.setNamesrvAddr(namesrvAddr);
+                mqAdminExt.setLanguage(LanguageCode.JAVA);
+                mqAdminExt.start();
+            } catch (MQClientException e) {
+                throw new CatalogException(
+                        "Failed to create RocketMQ admin using :" + namesrvAddr, e);
+            }
+        }
+        if (schemaRegistryClient == null) {
+            schemaRegistryClient = SchemaRegistryClientFactory.newClient(schemaRegistryUrl, null);
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        if (Objects.nonNull(mqAdminExt)) {
+            mqAdminExt.shutdown();
+            mqAdminExt = null;
+        }
+        if (Objects.nonNull(schemaRegistryClient)) {
+            schemaRegistryClient = null;
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return Collections.singletonList(getDefaultDatabase());
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        if (StringUtils.isEmpty(databaseName)) {
+            throw new CatalogException("Database name can not be null or empty.");
+        }
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        } else {
+            return new CatalogDatabaseImpl(new HashMap<>(), null);
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        return getDefaultDatabase().equals(databaseName);
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        if (!getDefaultDatabase().equals(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        try {
+            List<String> tables = schemaRegistryClient.getSubjectsByTenant("default", "default");
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to get topics of namespace %s from schema registry client.",
+                            databaseName),
+                    e);
+        }
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        String subject = tablePath.getObjectName();
+        try {
+            GetSchemaResponse getSchemaResponse = schemaRegistryClient.getSchemaBySubject(subject);
+            if (getSchemaResponse.getType() != SchemaType.AVRO) {
+                throw new CatalogException("Only support avro schema.");
+            }
+            return getCatalogTableForSchema(subject, getSchemaResponse);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to get schema of table %s from schema registry client.",
+                            tablePath.getFullName()),
+                    e);
+        }
+    }
+
+    private CatalogTable getCatalogTableForSchema(
+            String topic, GetSchemaResponse getSchemaResponse) {
+        DataType dataType = AvroSchemaConverter.convertToDataType(getSchemaResponse.getIdl());
+        Schema.Builder builder = Schema.newBuilder();
+        if (dataType instanceof FieldsDataType) {
+            FieldsDataType fieldsDataType = (FieldsDataType) dataType;
+            RowType rowType = (RowType) fieldsDataType.getLogicalType();
+            for (RowType.RowField field : rowType.getFields()) {
+                DataType toDataType = TypeConversions.fromLogicalToDataType(field.getType());
+                builder.column(field.getName(), toDataType);
+            }
+        }
+        Schema schema = builder.build();
+        Map<String, String> options = new HashMap<>();
+        options.put(RocketMqCatalogConstant.CONNECTOR, RocketMqCatalogConstant.ROCKETMQ_CONNECTOR);
+        options.put(RocketMqCatalogConstant.TOPIC, topic);
+        options.put(RocketMqCatalogConstant.NAME_SERVER_ADDRESS, mqAdminExt.getNamesrvAddr());
+        return CatalogTable.of(schema, null, Collections.emptyList(), options);
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        if (!getDefaultDatabase().equals(tablePath.getDatabaseName())) {
+            throw new CatalogException("Database name is not default.");
+        }
+        if (StringUtils.isEmpty(tablePath.getObjectName())) {
+            return false;
+        }
+        String subject = tablePath.getObjectName();
+        try {
+            GetSchemaResponse getSchemaResponse = schemaRegistryClient.getSchemaBySubject(subject);
+            if (Objects.nonNull(getSchemaResponse)) {
+                return true;
+            }
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to get schema of table %s from schema registry client.",
+                            tablePath.getFullName()),
+                    e);
+        }
+        return false;
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws TableNotExistException, TableNotPartitionedException, CatalogException {
+        checkNotNull(tablePath, "Table path cannot be null");
+
+        try {
+            TopicStatsTable topicStatsTable =
+                    mqAdminExt.examineTopicStats(tablePath.getObjectName());
+            return topicStatsTable.getOffsetTable().keySet().stream()
+                    .map(
+                            topicOffset ->
+                                    new CatalogPartitionSpec(
+                                            new HashMap<String, String>(1) {
+                                                {
+                                                    String queueId =
+                                                            String.valueOf(
+                                                                    topicOffset.getQueueId());
+                                                    put("__queue_id__", queueId);
+                                                }
+                                            }))
+                    .collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to list partitions of table %s by defaultMQAdminExt.",
+                            tablePath.getFullName()),
+                    e);
+        }
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws TableNotExistException, TableNotPartitionedException, CatalogException {
+        return listPartitions(tablePath);
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return new CatalogPartitionImpl(partitionSpec.getPartitionSpec(), null);
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        try {
+            List<CatalogPartitionSpec> catalogPartitionSpecs = listPartitions(tablePath);
+            return catalogPartitionSpecs.contains(partitionSpec);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to judge partition %s of table %s exists by defaultMQAdminExt.",
+                            partitionSpec, tablePath.getFullName()),
+                    e);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    // Unsupported catalog operations for RocketMQ
+    // There should not be such permission in the connector, it is very dangerous
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> listFunctions(String dbName)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath)
+            throws FunctionNotExistException, CatalogException {
+        throw new FunctionNotExistException("Not support to find functions.", functionPath);
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createFunction(
+            ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+            throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterFunction(
+            ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> listViews(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(
+            ObjectPath tablePath, List<Expression> expressions)
+            throws TableNotExistException, TableNotPartitionedException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition partition,
+            boolean ignoreIfExists)
+            throws TableNotExistException, TableNotPartitionedException,
+                    PartitionSpecInvalidException, PartitionAlreadyExistsException,
+                    CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropPartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition newPartition,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public void alterTableStatistics(
+            ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTableColumnStatistics(
+            ObjectPath tablePath,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogTableStatistics partitionStatistics,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
new file mode 100644
index 0000000..37d0b3c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.IDENTIFIER;
+import static org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.NAME_SERVER_ADDR;
+import static org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.SCHEMA_REGISTRY_BASE_URL;
+
+/** The {@CatalogFactory} implementation of RocketMQ. */
+public class RocketMQCatalogFactory implements CatalogFactory {
+
+    @Override
+    public Catalog createCatalog(Context context) {
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
+        return new RocketMQCatalog(
+                context.getName(),
+                helper.getOptions().get(DEFAULT_DATABASE),
+                helper.getOptions().get(NAME_SERVER_ADDR),
+                helper.getOptions().get(SCHEMA_REGISTRY_BASE_URL));
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
+        return options;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
new file mode 100644
index 0000000..25226b2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.flink.common.constant.SchemaRegistryConstant;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+
+/** {@link ConfigOption}s for {@link RocketMQCatalog}. */
+@Internal
+public final class RocketMQCatalogFactoryOptions {
+
+    public static final String IDENTIFIER = "rocketmq-catalog";
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .defaultValue(RocketMQCatalog.DEFAULT_DB);
+
+    public static final ConfigOption<String> NAME_SERVER_ADDR =
+            ConfigOptions.key(RocketMQConfig.NAME_SERVER_ADDR)
+                    .stringType()
+                    .defaultValue("http://localhost:9876")
+                    .withDescription("Required rocketmq name server address");
+
+    public static final ConfigOption<String> SCHEMA_REGISTRY_BASE_URL =
+            ConfigOptions.key(SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL_KEY)
+                    .stringType()
+                    .defaultValue(SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL)
+                    .withDescription("Required schema registry server address");
+
+    private RocketMQCatalogFactoryOptions() {}
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java b/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java
new file mode 100644
index 0000000..be3d9da
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.constant;
+
+/** RocketMqCatalogConstant. */
+public class RocketMqCatalogConstant {
+    public static final String CONNECTOR = "connector";
+
+    public static final String TOPIC = "topic";
+    public static final String NAME_SERVER_ADDRESS = "nameServerAddress";
+    public static final String ROCKETMQ_CONNECTOR = "rocketmq";
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java b/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java
new file mode 100644
index 0000000..8584cb3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.constant;
+
+/** SchemaRegistryConstant. */
+public class SchemaRegistryConstant {
+
+    public static final String SCHEMA_REGISTRY_BASE_URL = "http://localhost:8080";
+
+    public static final String SCHEMA_REGISTRY_BASE_URL_KEY = "schema.registry.base.url";
+}
diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index b164722..9e866fc 100644
--- a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory
-org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory
\ No newline at end of file
+org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory
+org.apache.rocketmq.flink.catalog.RocketMQCatalogFactory
diff --git a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
new file mode 100644
index 0000000..ad595d7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class RocketMQCatalogFactoryTest {
+
+    @Test
+    public void testCreateCatalog() {
+        RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+        FactoryUtil.DefaultCatalogContext context =
+                new FactoryUtil.DefaultCatalogContext(
+                        "rocketmq-catalog",
+                        new HashMap<>(),
+                        null,
+                        this.getClass().getClassLoader());
+        Catalog catalog = factory.createCatalog(context);
+        assertNotNull(catalog);
+    }
+
+    @Test
+    public void testFactoryIdentifier() {
+        RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+        assertEquals(factory.factoryIdentifier(), "rocketmq-catalog");
+    }
+
+    @Test
+    public void testRequiredOptions() {
+        RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+        Set<ConfigOption<?>> options = factory.requiredOptions();
+        assertNotNull(options);
+    }
+
+    @Test
+    public void testOptionalOptions() {
+        RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+        Set<ConfigOption<?>> options = factory.optionalOptions();
+        assertEquals(options.size(), 1);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
new file mode 100644
index 0000000..4a36c77
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.common.constant.SchemaRegistryConstant;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.Factory;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocketMQCatalogTest {
+    @Mock private SchemaRegistryClient schemaRegistryClient;
+    @Mock private DefaultMQAdminExt mqAdminExt;
+    @Mock private GetSchemaResponse getSchemaResponse;
+    private RocketMQCatalog rocketMQCatalog;
+
+    @Before
+    public void setUp() throws Exception {
+        rocketMQCatalog =
+                new RocketMQCatalog(
+                        "rocketmq-catalog",
+                        "default",
+                        "http://localhost:9876",
+                        SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL);
+
+        Field schemaRegistryClientField =
+                rocketMQCatalog.getClass().getDeclaredField("schemaRegistryClient");
+        schemaRegistryClientField.setAccessible(true);
+        schemaRegistryClientField.set(rocketMQCatalog, schemaRegistryClient);
+
+        Field mqAdminExtField = rocketMQCatalog.getClass().getDeclaredField("mqAdminExt");
+        mqAdminExtField.setAccessible(true);
+        mqAdminExtField.set(rocketMQCatalog, mqAdminExt);
+
+        List<String> list = new ArrayList();
+        list.add("test");
+        Mockito.when(schemaRegistryClient.getSubjectsByTenant("default", "default"))
+                .thenReturn(list);
+
+        Mockito.when(mqAdminExt.getNamesrvAddr()).thenReturn("localhost:9876");
+        Mockito.when(schemaRegistryClient.getSchemaBySubject("test")).thenReturn(getSchemaResponse);
+        Mockito.when(getSchemaResponse.getType()).thenReturn(SchemaType.AVRO);
+        Mockito.when(getSchemaResponse.getIdl())
+                .thenReturn(
+                        "{\"type\":\"record\",\"name\":\"Charge\","
+                                + "\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\","
+                                + "\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
+
+        TopicStatsTable topicStatsTable = new TopicStatsTable();
+        topicStatsTable.setOffsetTable(
+                new HashMap<MessageQueue, TopicOffset>(2) {
+                    {
+                        put(new MessageQueue("test", "default", 0), new TopicOffset());
+                        put(new MessageQueue("test", "default", 1), new TopicOffset());
+                    }
+                });
+
+        Mockito.when(mqAdminExt.examineTopicStats("test")).thenReturn(topicStatsTable);
+    }
+
+    @Test
+    public void testGetFactory() {
+        Optional<Factory> factory = rocketMQCatalog.getFactory();
+        assertNotNull(factory.get());
+    }
+
+    @Test
+    public void testOpen() throws NoSuchFieldException, IllegalAccessException {
+        rocketMQCatalog.open();
+
+        Class<? extends RocketMQCatalog> aClass = rocketMQCatalog.getClass();
+        Field mqAdminExtField = aClass.getDeclaredField("mqAdminExt");
+        mqAdminExtField.setAccessible(true);
+        Field schemaRegistryClientField = aClass.getDeclaredField("schemaRegistryClient");
+        schemaRegistryClientField.setAccessible(true);
+
+        Object mqAdminExt = mqAdminExtField.get(rocketMQCatalog);
+        Object schemaRegistryClient = schemaRegistryClientField.get(rocketMQCatalog);
+        assertNotNull(mqAdminExt);
+        assertNotNull(schemaRegistryClient);
+    }
+
+    @Test
+    public void testClose() throws NoSuchFieldException, IllegalAccessException {
+        rocketMQCatalog.close();
+
+        Class<? extends RocketMQCatalog> aClass = rocketMQCatalog.getClass();
+        Field mqAdminExtField = aClass.getDeclaredField("mqAdminExt");
+        mqAdminExtField.setAccessible(true);
+        Field schemaRegistryClientField = aClass.getDeclaredField("schemaRegistryClient");
+        schemaRegistryClientField.setAccessible(true);
+
+        Object mqAdminExt = mqAdminExtField.get(rocketMQCatalog);
+        Object schemaRegistryClient = schemaRegistryClientField.get(rocketMQCatalog);
+        assertNull(schemaRegistryClient);
+    }
+
+    @Test
+    public void testListDatabases() {
+        List<String> strings = rocketMQCatalog.listDatabases();
+        assertEquals(1, strings.size());
+        assertEquals("default", strings.get(0));
+    }
+
+    @Test
+    public void testGetDatabase() throws DatabaseNotExistException {
+        CatalogDatabase database = rocketMQCatalog.getDatabase("default");
+        assertNotNull(database);
+    }
+
+    @Test
+    public void testDatabaseExists() {
+        boolean exists = rocketMQCatalog.databaseExists("default");
+        assertTrue(exists);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCreateDatabase() throws DatabaseAlreadyExistException {
+        rocketMQCatalog.createDatabase("test", null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDropDatabase() throws DatabaseNotEmptyException, DatabaseNotExistException {
+        rocketMQCatalog.dropDatabase("test", false, false);
+    }
+
+    @Test
+    public void testListTables() throws DatabaseNotExistException {
+        List<String> strings = rocketMQCatalog.listTables("default");
+        assertEquals(1, strings.size());
+        assertEquals("test", strings.get(0));
+    }
+
+    @Test
+    public void testGetTable() throws TableNotExistException {
+        ObjectPath objectPath = new ObjectPath("default", "test");
+        CatalogBaseTable catalogBaseTable = rocketMQCatalog.getTable(objectPath);
+        assertNotNull(catalogBaseTable);
+    }
+
+    @Test
+    public void testTableExists() {
+        ObjectPath objectPath = new ObjectPath("default", "test");
+        boolean exists = rocketMQCatalog.tableExists(objectPath);
+        assertTrue(exists);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCreateTable() throws TableAlreadyExistException, DatabaseNotExistException {
+        rocketMQCatalog.createTable(null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDropTable() throws TableNotExistException {
+        rocketMQCatalog.dropTable(null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testListFunctions() throws DatabaseNotExistException {
+        rocketMQCatalog.listFunctions("default");
+    }
+
+    @Test(expected = FunctionNotExistException.class)
+    public void testGetFunction() throws FunctionNotExistException {
+        ObjectPath objectPath = new ObjectPath("default", "test");
+        rocketMQCatalog.getFunction(objectPath);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testFunctionExists() {
+        boolean exists = rocketMQCatalog.functionExists(null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCreateFunction()
+            throws FunctionAlreadyExistException, DatabaseNotExistException {
+        rocketMQCatalog.createFunction(null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAlterFunction() throws FunctionNotExistException {
+        rocketMQCatalog.alterFunction(null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDropFunction() throws FunctionNotExistException {
+        rocketMQCatalog.dropFunction(null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAlterDatabase() throws DatabaseNotExistException {
+        rocketMQCatalog.alterDatabase(null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testListViews() throws DatabaseNotExistException {
+        rocketMQCatalog.listViews("default");
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAlterTable() throws TableNotExistException {
+        rocketMQCatalog.alterTable(null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testRenameTable() throws TableAlreadyExistException, TableNotExistException {
+        rocketMQCatalog.renameTable(null, null, false);
+    }
+
+    @Test
+    public void testListPartitions() throws TableNotPartitionedException, TableNotExistException {
+        List<CatalogPartitionSpec> catalogPartitionSpecs =
+                rocketMQCatalog.listPartitions(new ObjectPath("default", "test"));
+        assertEquals(2, catalogPartitionSpecs.size());
+        assertEquals(
+                new ArrayList<CatalogPartitionSpec>() {
+                    {
+                        add(
+                                new CatalogPartitionSpec(
+                                        new HashMap<String, String>(1) {
+                                            {
+                                                put("__queue_id__", String.valueOf(0));
+                                            }
+                                        }));
+                        add(
+                                new CatalogPartitionSpec(
+                                        new HashMap<String, String>(1) {
+                                            {
+                                                put("__queue_id__", String.valueOf(1));
+                                            }
+                                        }));
+                    }
+                },
+                catalogPartitionSpecs);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testListPartitionsByFilter()
+            throws TableNotPartitionedException, TableNotExistException {
+        rocketMQCatalog.listPartitionsByFilter(null, null);
+    }
+
+    @Test
+    public void testGetPartition() throws PartitionNotExistException {
+        ObjectPath objectPath = new ObjectPath("default", "test");
+        CatalogPartition partition =
+                rocketMQCatalog.getPartition(
+                        objectPath,
+                        new CatalogPartitionSpec(
+                                new HashMap<String, String>(1) {
+                                    {
+                                        put("__queue_id__", String.valueOf(0));
+                                    }
+                                }));
+
+        assertEquals(
+                new HashMap<String, String>(1) {
+                    {
+                        put("__queue_id__", String.valueOf(0));
+                    }
+                },
+                partition.getProperties());
+    }
+
+    @Test
+    public void testPartitionExists() {
+        ObjectPath objectPath = new ObjectPath("default", "test");
+        boolean test =
+                rocketMQCatalog.partitionExists(
+                        objectPath,
+                        new CatalogPartitionSpec(
+                                new HashMap<String, String>(1) {
+                                    {
+                                        put("__queue_id__", String.valueOf(0));
+                                    }
+                                }));
+        assertNotNull(test);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCreatePartition()
+            throws TableNotPartitionedException, TableNotExistException,
+                    PartitionSpecInvalidException, PartitionAlreadyExistsException {
+        rocketMQCatalog.createPartition(null, null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDropPartition() throws PartitionNotExistException {
+        rocketMQCatalog.dropPartition(null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAlterPartition() throws PartitionNotExistException {
+        rocketMQCatalog.alterPartition(null, null, null, false);
+    }
+
+    @Test
+    public void testGetTableStatistics() throws TableNotExistException {
+        CatalogTableStatistics statistics = rocketMQCatalog.getTableStatistics(null);
+        assertEquals(statistics, CatalogTableStatistics.UNKNOWN);
+    }
+
+    @Test
+    public void testGetTableColumnStatistics() throws TableNotExistException {
+        CatalogColumnStatistics statistics = rocketMQCatalog.getTableColumnStatistics(null);
+        assertEquals(statistics, CatalogColumnStatistics.UNKNOWN);
+    }
+
+    @Test
+    public void testGetPartitionStatistics() throws PartitionNotExistException {
+        CatalogTableStatistics statistics = rocketMQCatalog.getPartitionStatistics(null, null);
+        assertEquals(statistics, CatalogTableStatistics.UNKNOWN);
+    }
+
+    @Test
+    public void testGetPartitionColumnStatistics() throws PartitionNotExistException {
+        CatalogColumnStatistics statistics =
+                rocketMQCatalog.getPartitionColumnStatistics(null, null);
+        assertEquals(statistics, CatalogColumnStatistics.UNKNOWN);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAlterTableStatistics() throws TableNotExistException {
+        rocketMQCatalog.alterTableStatistics(null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAlterTableColumnStatistics() throws TableNotExistException {
+        rocketMQCatalog.alterTableColumnStatistics(null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAlterPartitionStatistics() throws PartitionNotExistException {
+        rocketMQCatalog.alterPartitionStatistics(null, null, null, false);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAlterPartitionColumnStatistics() throws PartitionNotExistException {
+        rocketMQCatalog.alterPartitionColumnStatistics(null, null, null, false);
+    }
+}