[issues-32] Implement of Apache RocketMQ Flink Catalog. (#49)
Co-authored-by: lixiaoshuang <644968328@qq.com>
Co-authored-by: gongzhongqiang <764629910@qq.com>
diff --git a/README.md b/README.md
index 9ecf41b..fa53dd7 100644
--- a/README.md
+++ b/README.md
@@ -145,7 +145,7 @@
) WITH (
'connector' = 'rocketmq',
'topic' = 'user_behavior',
- 'consumeGroup' = 'behavior_consume_group',
+ 'consumerGroup' = 'behavior_consumer_group',
'nameServerAddress' = '127.0.0.1:9876'
);
@@ -183,7 +183,7 @@
) WITH (
'connector' = 'rocketmq',
'topic' = 'user_behavior',
- 'consumeGroup' = 'behavior_consume_group',
+ 'consumerGroup' = 'behavior_consumer_group',
'nameServerAddress' = '127.0.0.1:9876'
);
```
diff --git a/pom.xml b/pom.xml
index 72268c9..1b725e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,8 @@
<flink.version>1.15.0</flink.version>
<commons-lang.version>2.6</commons-lang.version>
<spotless.version>2.4.2</spotless.version>
- <jaxb-api.version>2.3.1</jaxb-api.version>
+ <jaxb-api.version>2.3.1</jaxb-api.version>
+ <rocketmq.schema.registry.version>0.0.4-SNAPSHOT</rocketmq.schema.registry.version>
</properties>
<dependencies>
@@ -84,6 +85,11 @@
<version>${flink.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
@@ -106,10 +112,25 @@
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-namesrv</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-broker</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-test</artifactId>
<version>${rocketmq.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>schema-registry-client</artifactId>
+ <version>${rocketmq.schema.registry.version}</version>
+ </dependency>
+
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
@@ -135,18 +156,7 @@
<version>1.5.5</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-namesrv</artifactId>
- <version>${rocketmq.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-broker</artifactId>
- <version>${rocketmq.version}</version>
- <scope>test</scope>
- </dependency>
+
</dependencies>
<build>
diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java
new file mode 100644
index 0000000..3636995
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.flink.common.constant.RocketMqCatalogConstant;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** A catalog implementation for RocketMQ. */
+public class RocketMQCatalog extends AbstractCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RocketMQCatalog.class);
+ public static final String DEFAULT_DB = "default";
+ public final String namesrvAddr;
+ private final String schemaRegistryUrl;
+ private DefaultMQAdminExt mqAdminExt;
+ private SchemaRegistryClient schemaRegistryClient;
+
+ public RocketMQCatalog(
+ String catalogName, String database, String namesrvAddr, String schemaRegistryUrl) {
+ super(catalogName, database);
+
+ checkArgument(!isNullOrWhitespaceOnly(namesrvAddr), "namesrvAddr cannot be null or empty");
+ checkArgument(
+ !isNullOrWhitespaceOnly(schemaRegistryUrl),
+ "schemaRegistryUrl cannot be null or empty");
+
+ this.namesrvAddr = namesrvAddr;
+ this.schemaRegistryUrl = schemaRegistryUrl;
+ LOG.info("Created RocketMQ Catalog {}", catalogName);
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(new RocketMQCatalogFactory());
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ if (mqAdminExt == null) {
+ try {
+ mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setNamesrvAddr(namesrvAddr);
+ mqAdminExt.setLanguage(LanguageCode.JAVA);
+ mqAdminExt.start();
+ } catch (MQClientException e) {
+ throw new CatalogException(
+ "Failed to create RocketMQ admin using :" + namesrvAddr, e);
+ }
+ }
+ if (schemaRegistryClient == null) {
+ schemaRegistryClient = SchemaRegistryClientFactory.newClient(schemaRegistryUrl, null);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (Objects.nonNull(mqAdminExt)) {
+ mqAdminExt.shutdown();
+ mqAdminExt = null;
+ }
+ if (Objects.nonNull(schemaRegistryClient)) {
+ schemaRegistryClient = null;
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return Collections.singletonList(getDefaultDatabase());
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ if (StringUtils.isEmpty(databaseName)) {
+ throw new CatalogException("Database name can not be null or empty.");
+ }
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ } else {
+ return new CatalogDatabaseImpl(new HashMap<>(), null);
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ return getDefaultDatabase().equals(databaseName);
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ if (!getDefaultDatabase().equals(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ try {
+ List<String> tables = schemaRegistryClient.getSubjectsByTenant("default", "default");
+ return tables;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get topics of namespace %s from schema registry client.",
+ databaseName),
+ e);
+ }
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ String subject = tablePath.getObjectName();
+ try {
+ GetSchemaResponse getSchemaResponse = schemaRegistryClient.getSchemaBySubject(subject);
+ if (getSchemaResponse.getType() != SchemaType.AVRO) {
+ throw new CatalogException("Only support avro schema.");
+ }
+ return getCatalogTableForSchema(subject, getSchemaResponse);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get schema of table %s from schema registry client.",
+ tablePath.getFullName()),
+ e);
+ }
+ }
+
+ private CatalogTable getCatalogTableForSchema(
+ String topic, GetSchemaResponse getSchemaResponse) {
+ DataType dataType = AvroSchemaConverter.convertToDataType(getSchemaResponse.getIdl());
+ Schema.Builder builder = Schema.newBuilder();
+ if (dataType instanceof FieldsDataType) {
+ FieldsDataType fieldsDataType = (FieldsDataType) dataType;
+ RowType rowType = (RowType) fieldsDataType.getLogicalType();
+ for (RowType.RowField field : rowType.getFields()) {
+ DataType toDataType = TypeConversions.fromLogicalToDataType(field.getType());
+ builder.column(field.getName(), toDataType);
+ }
+ }
+ Schema schema = builder.build();
+ Map<String, String> options = new HashMap<>();
+ options.put(RocketMqCatalogConstant.CONNECTOR, RocketMqCatalogConstant.ROCKETMQ_CONNECTOR);
+ options.put(RocketMqCatalogConstant.TOPIC, topic);
+ options.put(RocketMqCatalogConstant.NAME_SERVER_ADDRESS, mqAdminExt.getNamesrvAddr());
+ return CatalogTable.of(schema, null, Collections.emptyList(), options);
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ if (!getDefaultDatabase().equals(tablePath.getDatabaseName())) {
+ throw new CatalogException("Database name is not default.");
+ }
+ if (StringUtils.isEmpty(tablePath.getObjectName())) {
+ return false;
+ }
+ String subject = tablePath.getObjectName();
+ try {
+ GetSchemaResponse getSchemaResponse = schemaRegistryClient.getSchemaBySubject(subject);
+ if (Objects.nonNull(getSchemaResponse)) {
+ return true;
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get schema of table %s from schema registry client.",
+ tablePath.getFullName()),
+ e);
+ }
+ return false;
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+
+ try {
+ TopicStatsTable topicStatsTable =
+ mqAdminExt.examineTopicStats(tablePath.getObjectName());
+ return topicStatsTable.getOffsetTable().keySet().stream()
+ .map(
+ topicOffset ->
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ String queueId =
+ String.valueOf(
+ topicOffset.getQueueId());
+ put("__queue_id__", queueId);
+ }
+ }))
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to list partitions of table %s by defaultMQAdminExt.",
+ tablePath.getFullName()),
+ e);
+ }
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ return listPartitions(tablePath);
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return new CatalogPartitionImpl(partitionSpec.getPartitionSpec(), null);
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ try {
+ List<CatalogPartitionSpec> catalogPartitionSpecs = listPartitions(tablePath);
+ return catalogPartitionSpecs.contains(partitionSpec);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to judge partition %s of table %s exists by defaultMQAdminExt.",
+ partitionSpec, tablePath.getFullName()),
+ e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Unsupported catalog operations for RocketMQ
+ // There should not be such permission in the connector, it is very dangerous
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath)
+ throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException("Not support to find functions.", functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createFunction(
+ ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+ throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(
+ ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listViews(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(
+ ObjectPath tablePath, List<Expression> expressions)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition partition,
+ boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException,
+ PartitionSpecInvalidException, PartitionAlreadyExistsException,
+ CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition newPartition,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public void alterTableStatistics(
+ ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(
+ ObjectPath tablePath,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
new file mode 100644
index 0000000..37d0b3c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.IDENTIFIER;
+import static org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.NAME_SERVER_ADDR;
+import static org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.SCHEMA_REGISTRY_BASE_URL;
+
+/** The {@CatalogFactory} implementation of RocketMQ. */
+public class RocketMQCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(Context context) {
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtil.createCatalogFactoryHelper(this, context);
+ helper.validate();
+ return new RocketMQCatalog(
+ context.getName(),
+ helper.getOptions().get(DEFAULT_DATABASE),
+ helper.getOptions().get(NAME_SERVER_ADDR),
+ helper.getOptions().get(SCHEMA_REGISTRY_BASE_URL));
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DEFAULT_DATABASE);
+ return options;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
new file mode 100644
index 0000000..25226b2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.flink.common.constant.SchemaRegistryConstant;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+
+/** {@link ConfigOption}s for {@link RocketMQCatalog}. */
+@Internal
+public final class RocketMQCatalogFactoryOptions {
+
+ public static final String IDENTIFIER = "rocketmq-catalog";
+
+ public static final ConfigOption<String> DEFAULT_DATABASE =
+ ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+ .stringType()
+ .defaultValue(RocketMQCatalog.DEFAULT_DB);
+
+ public static final ConfigOption<String> NAME_SERVER_ADDR =
+ ConfigOptions.key(RocketMQConfig.NAME_SERVER_ADDR)
+ .stringType()
+ .defaultValue("http://localhost:9876")
+ .withDescription("Required rocketmq name server address");
+
+ public static final ConfigOption<String> SCHEMA_REGISTRY_BASE_URL =
+ ConfigOptions.key(SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL_KEY)
+ .stringType()
+ .defaultValue(SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL)
+ .withDescription("Required schema registry server address");
+
+ private RocketMQCatalogFactoryOptions() {}
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java b/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java
new file mode 100644
index 0000000..be3d9da
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.constant;
+
+/** RocketMqCatalogConstant. */
+public class RocketMqCatalogConstant {
+ public static final String CONNECTOR = "connector";
+
+ public static final String TOPIC = "topic";
+ public static final String NAME_SERVER_ADDRESS = "nameServerAddress";
+ public static final String ROCKETMQ_CONNECTOR = "rocketmq";
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java b/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java
new file mode 100644
index 0000000..8584cb3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.constant;
+
+/** SchemaRegistryConstant. */
+public class SchemaRegistryConstant {
+
+ public static final String SCHEMA_REGISTRY_BASE_URL = "http://localhost:8080";
+
+ public static final String SCHEMA_REGISTRY_BASE_URL_KEY = "schema.registry.base.url";
+}
diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index b164722..9e866fc 100644
--- a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory
-org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory
\ No newline at end of file
+org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory
+org.apache.rocketmq.flink.catalog.RocketMQCatalogFactory
diff --git a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
new file mode 100644
index 0000000..ad595d7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class RocketMQCatalogFactoryTest {
+
+ @Test
+ public void testCreateCatalog() {
+ RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+ FactoryUtil.DefaultCatalogContext context =
+ new FactoryUtil.DefaultCatalogContext(
+ "rocketmq-catalog",
+ new HashMap<>(),
+ null,
+ this.getClass().getClassLoader());
+ Catalog catalog = factory.createCatalog(context);
+ assertNotNull(catalog);
+ }
+
+ @Test
+ public void testFactoryIdentifier() {
+ RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+ assertEquals(factory.factoryIdentifier(), "rocketmq-catalog");
+ }
+
+ @Test
+ public void testRequiredOptions() {
+ RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+ Set<ConfigOption<?>> options = factory.requiredOptions();
+ assertNotNull(options);
+ }
+
+ @Test
+ public void testOptionalOptions() {
+ RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+ Set<ConfigOption<?>> options = factory.optionalOptions();
+ assertEquals(options.size(), 1);
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
new file mode 100644
index 0000000..4a36c77
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.common.constant.SchemaRegistryConstant;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.Factory;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocketMQCatalogTest {
+ @Mock private SchemaRegistryClient schemaRegistryClient;
+ @Mock private DefaultMQAdminExt mqAdminExt;
+ @Mock private GetSchemaResponse getSchemaResponse;
+ private RocketMQCatalog rocketMQCatalog;
+
+ @Before
+ public void setUp() throws Exception {
+ rocketMQCatalog =
+ new RocketMQCatalog(
+ "rocketmq-catalog",
+ "default",
+ "http://localhost:9876",
+ SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL);
+
+ Field schemaRegistryClientField =
+ rocketMQCatalog.getClass().getDeclaredField("schemaRegistryClient");
+ schemaRegistryClientField.setAccessible(true);
+ schemaRegistryClientField.set(rocketMQCatalog, schemaRegistryClient);
+
+ Field mqAdminExtField = rocketMQCatalog.getClass().getDeclaredField("mqAdminExt");
+ mqAdminExtField.setAccessible(true);
+ mqAdminExtField.set(rocketMQCatalog, mqAdminExt);
+
+ List<String> list = new ArrayList();
+ list.add("test");
+ Mockito.when(schemaRegistryClient.getSubjectsByTenant("default", "default"))
+ .thenReturn(list);
+
+ Mockito.when(mqAdminExt.getNamesrvAddr()).thenReturn("localhost:9876");
+ Mockito.when(schemaRegistryClient.getSchemaBySubject("test")).thenReturn(getSchemaResponse);
+ Mockito.when(getSchemaResponse.getType()).thenReturn(SchemaType.AVRO);
+ Mockito.when(getSchemaResponse.getIdl())
+ .thenReturn(
+ "{\"type\":\"record\",\"name\":\"Charge\","
+ + "\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\","
+ + "\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
+
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
+ topicStatsTable.setOffsetTable(
+ new HashMap<MessageQueue, TopicOffset>(2) {
+ {
+ put(new MessageQueue("test", "default", 0), new TopicOffset());
+ put(new MessageQueue("test", "default", 1), new TopicOffset());
+ }
+ });
+
+ Mockito.when(mqAdminExt.examineTopicStats("test")).thenReturn(topicStatsTable);
+ }
+
+ @Test
+ public void testGetFactory() {
+ Optional<Factory> factory = rocketMQCatalog.getFactory();
+ assertNotNull(factory.get());
+ }
+
+ @Test
+ public void testOpen() throws NoSuchFieldException, IllegalAccessException {
+ rocketMQCatalog.open();
+
+ Class<? extends RocketMQCatalog> aClass = rocketMQCatalog.getClass();
+ Field mqAdminExtField = aClass.getDeclaredField("mqAdminExt");
+ mqAdminExtField.setAccessible(true);
+ Field schemaRegistryClientField = aClass.getDeclaredField("schemaRegistryClient");
+ schemaRegistryClientField.setAccessible(true);
+
+ Object mqAdminExt = mqAdminExtField.get(rocketMQCatalog);
+ Object schemaRegistryClient = schemaRegistryClientField.get(rocketMQCatalog);
+ assertNotNull(mqAdminExt);
+ assertNotNull(schemaRegistryClient);
+ }
+
+ @Test
+ public void testClose() throws NoSuchFieldException, IllegalAccessException {
+ rocketMQCatalog.close();
+
+ Class<? extends RocketMQCatalog> aClass = rocketMQCatalog.getClass();
+ Field mqAdminExtField = aClass.getDeclaredField("mqAdminExt");
+ mqAdminExtField.setAccessible(true);
+ Field schemaRegistryClientField = aClass.getDeclaredField("schemaRegistryClient");
+ schemaRegistryClientField.setAccessible(true);
+
+ Object mqAdminExt = mqAdminExtField.get(rocketMQCatalog);
+ Object schemaRegistryClient = schemaRegistryClientField.get(rocketMQCatalog);
+ assertNull(schemaRegistryClient);
+ }
+
+ @Test
+ public void testListDatabases() {
+ List<String> strings = rocketMQCatalog.listDatabases();
+ assertEquals(1, strings.size());
+ assertEquals("default", strings.get(0));
+ }
+
+ @Test
+ public void testGetDatabase() throws DatabaseNotExistException {
+ CatalogDatabase database = rocketMQCatalog.getDatabase("default");
+ assertNotNull(database);
+ }
+
+ @Test
+ public void testDatabaseExists() {
+ boolean exists = rocketMQCatalog.databaseExists("default");
+ assertTrue(exists);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreateDatabase() throws DatabaseAlreadyExistException {
+ rocketMQCatalog.createDatabase("test", null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDropDatabase() throws DatabaseNotEmptyException, DatabaseNotExistException {
+ rocketMQCatalog.dropDatabase("test", false, false);
+ }
+
+ @Test
+ public void testListTables() throws DatabaseNotExistException {
+ List<String> strings = rocketMQCatalog.listTables("default");
+ assertEquals(1, strings.size());
+ assertEquals("test", strings.get(0));
+ }
+
+ @Test
+ public void testGetTable() throws TableNotExistException {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ CatalogBaseTable catalogBaseTable = rocketMQCatalog.getTable(objectPath);
+ assertNotNull(catalogBaseTable);
+ }
+
+ @Test
+ public void testTableExists() {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ boolean exists = rocketMQCatalog.tableExists(objectPath);
+ assertTrue(exists);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreateTable() throws TableAlreadyExistException, DatabaseNotExistException {
+ rocketMQCatalog.createTable(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDropTable() throws TableNotExistException {
+ rocketMQCatalog.dropTable(null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testListFunctions() throws DatabaseNotExistException {
+ rocketMQCatalog.listFunctions("default");
+ }
+
+ @Test(expected = FunctionNotExistException.class)
+ public void testGetFunction() throws FunctionNotExistException {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ rocketMQCatalog.getFunction(objectPath);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testFunctionExists() {
+ boolean exists = rocketMQCatalog.functionExists(null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreateFunction()
+ throws FunctionAlreadyExistException, DatabaseNotExistException {
+ rocketMQCatalog.createFunction(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterFunction() throws FunctionNotExistException {
+ rocketMQCatalog.alterFunction(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDropFunction() throws FunctionNotExistException {
+ rocketMQCatalog.dropFunction(null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterDatabase() throws DatabaseNotExistException {
+ rocketMQCatalog.alterDatabase(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testListViews() throws DatabaseNotExistException {
+ rocketMQCatalog.listViews("default");
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterTable() throws TableNotExistException {
+ rocketMQCatalog.alterTable(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRenameTable() throws TableAlreadyExistException, TableNotExistException {
+ rocketMQCatalog.renameTable(null, null, false);
+ }
+
+ @Test
+ public void testListPartitions() throws TableNotPartitionedException, TableNotExistException {
+ List<CatalogPartitionSpec> catalogPartitionSpecs =
+ rocketMQCatalog.listPartitions(new ObjectPath("default", "test"));
+ assertEquals(2, catalogPartitionSpecs.size());
+ assertEquals(
+ new ArrayList<CatalogPartitionSpec>() {
+ {
+ add(
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__", String.valueOf(0));
+ }
+ }));
+ add(
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__", String.valueOf(1));
+ }
+ }));
+ }
+ },
+ catalogPartitionSpecs);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testListPartitionsByFilter()
+ throws TableNotPartitionedException, TableNotExistException {
+ rocketMQCatalog.listPartitionsByFilter(null, null);
+ }
+
+ @Test
+ public void testGetPartition() throws PartitionNotExistException {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ CatalogPartition partition =
+ rocketMQCatalog.getPartition(
+ objectPath,
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__", String.valueOf(0));
+ }
+ }));
+
+ assertEquals(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__", String.valueOf(0));
+ }
+ },
+ partition.getProperties());
+ }
+
+ @Test
+ public void testPartitionExists() {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ boolean test =
+ rocketMQCatalog.partitionExists(
+ objectPath,
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__", String.valueOf(0));
+ }
+ }));
+ assertNotNull(test);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreatePartition()
+ throws TableNotPartitionedException, TableNotExistException,
+ PartitionSpecInvalidException, PartitionAlreadyExistsException {
+ rocketMQCatalog.createPartition(null, null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDropPartition() throws PartitionNotExistException {
+ rocketMQCatalog.dropPartition(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterPartition() throws PartitionNotExistException {
+ rocketMQCatalog.alterPartition(null, null, null, false);
+ }
+
+ @Test
+ public void testGetTableStatistics() throws TableNotExistException {
+ CatalogTableStatistics statistics = rocketMQCatalog.getTableStatistics(null);
+ assertEquals(statistics, CatalogTableStatistics.UNKNOWN);
+ }
+
+ @Test
+ public void testGetTableColumnStatistics() throws TableNotExistException {
+ CatalogColumnStatistics statistics = rocketMQCatalog.getTableColumnStatistics(null);
+ assertEquals(statistics, CatalogColumnStatistics.UNKNOWN);
+ }
+
+ @Test
+ public void testGetPartitionStatistics() throws PartitionNotExistException {
+ CatalogTableStatistics statistics = rocketMQCatalog.getPartitionStatistics(null, null);
+ assertEquals(statistics, CatalogTableStatistics.UNKNOWN);
+ }
+
+ @Test
+ public void testGetPartitionColumnStatistics() throws PartitionNotExistException {
+ CatalogColumnStatistics statistics =
+ rocketMQCatalog.getPartitionColumnStatistics(null, null);
+ assertEquals(statistics, CatalogColumnStatistics.UNKNOWN);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterTableStatistics() throws TableNotExistException {
+ rocketMQCatalog.alterTableStatistics(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterTableColumnStatistics() throws TableNotExistException {
+ rocketMQCatalog.alterTableColumnStatistics(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterPartitionStatistics() throws PartitionNotExistException {
+ rocketMQCatalog.alterPartitionStatistics(null, null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterPartitionColumnStatistics() throws PartitionNotExistException {
+ rocketMQCatalog.alterPartitionColumnStatistics(null, null, null, false);
+ }
+}