blob: 6801ae8a706df35569ef007fa53cb36b0f8894ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.catalog;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.common.constant.SchemaRegistryConstant;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.apache.rocketmq.schema.registry.common.model.SchemaType;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.factories.Factory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(MockitoJUnitRunner.class)
public class RocketMQCatalogTest {
@Mock private SchemaRegistryClient schemaRegistryClient;
@Mock private DefaultMQAdminExt mqAdminExt;
@Mock private GetSchemaResponse getSchemaResponse;
private RocketMQCatalog rocketMQCatalog;
@Before
public void setUp() throws Exception {
rocketMQCatalog =
new RocketMQCatalog(
"rocketmq_catalog",
"default",
"http://localhost:9876",
SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL);
Field schemaRegistryClientField =
rocketMQCatalog.getClass().getDeclaredField("schemaRegistryClient");
schemaRegistryClientField.setAccessible(true);
schemaRegistryClientField.set(rocketMQCatalog, schemaRegistryClient);
Field mqAdminExtField = rocketMQCatalog.getClass().getDeclaredField("mqAdminExt");
mqAdminExtField.setAccessible(true);
mqAdminExtField.set(rocketMQCatalog, mqAdminExt);
List<String> list = new ArrayList();
list.add("test");
Mockito.when(schemaRegistryClient.getSubjectsByTenant("default", "default"))
.thenReturn(list);
Mockito.when(mqAdminExt.getNamesrvAddr()).thenReturn("localhost:9876");
Mockito.when(schemaRegistryClient.getSchemaBySubject("test")).thenReturn(getSchemaResponse);
Mockito.when(getSchemaResponse.getType()).thenReturn(SchemaType.AVRO);
Mockito.when(getSchemaResponse.getIdl())
.thenReturn(
"{\"type\":\"record\",\"name\":\"Charge\","
+ "\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\","
+ "\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
TopicStatsTable topicStatsTable = new TopicStatsTable();
topicStatsTable.setOffsetTable(
new HashMap<MessageQueue, TopicOffset>(2) {
{
put(new MessageQueue("test", "default", 0), new TopicOffset());
put(new MessageQueue("test", "default", 1), new TopicOffset());
}
});
Mockito.when(mqAdminExt.examineTopicStats("test")).thenReturn(topicStatsTable);
}
@Test
public void testGetFactory() {
Optional<Factory> factory = rocketMQCatalog.getFactory();
assertNotNull(factory.get());
}
@Test
public void testOpen() throws NoSuchFieldException, IllegalAccessException {
rocketMQCatalog.open();
Class<? extends RocketMQCatalog> aClass = rocketMQCatalog.getClass();
Field mqAdminExtField = aClass.getDeclaredField("mqAdminExt");
mqAdminExtField.setAccessible(true);
Field schemaRegistryClientField = aClass.getDeclaredField("schemaRegistryClient");
schemaRegistryClientField.setAccessible(true);
Object mqAdminExt = mqAdminExtField.get(rocketMQCatalog);
Object schemaRegistryClient = schemaRegistryClientField.get(rocketMQCatalog);
assertNotNull(mqAdminExt);
assertNotNull(schemaRegistryClient);
}
@Test
public void testClose() throws NoSuchFieldException, IllegalAccessException {
rocketMQCatalog.close();
Class<? extends RocketMQCatalog> aClass = rocketMQCatalog.getClass();
Field mqAdminExtField = aClass.getDeclaredField("mqAdminExt");
mqAdminExtField.setAccessible(true);
Field schemaRegistryClientField = aClass.getDeclaredField("schemaRegistryClient");
schemaRegistryClientField.setAccessible(true);
Object mqAdminExt = mqAdminExtField.get(rocketMQCatalog);
Object schemaRegistryClient = schemaRegistryClientField.get(rocketMQCatalog);
assertNull(schemaRegistryClient);
}
@Test
public void testListDatabases() {
List<String> strings = rocketMQCatalog.listDatabases();
assertEquals(1, strings.size());
assertEquals("default", strings.get(0));
}
@Test
public void testGetDatabase() throws DatabaseNotExistException {
CatalogDatabase database = rocketMQCatalog.getDatabase("default");
assertNotNull(database);
}
@Test
public void testDatabaseExists() {
boolean exists = rocketMQCatalog.databaseExists("default");
assertTrue(exists);
}
@Test(expected = UnsupportedOperationException.class)
public void testCreateDatabase() throws DatabaseAlreadyExistException {
rocketMQCatalog.createDatabase("test", null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testDropDatabase() throws DatabaseNotEmptyException, DatabaseNotExistException {
rocketMQCatalog.dropDatabase("test", false, false);
}
@Test
public void testListTables() throws DatabaseNotExistException {
List<String> strings = rocketMQCatalog.listTables("default");
assertEquals(1, strings.size());
assertEquals("test", strings.get(0));
}
@Test
public void testGetTable() throws TableNotExistException {
ObjectPath objectPath = new ObjectPath("default", "test");
CatalogBaseTable catalogBaseTable = rocketMQCatalog.getTable(objectPath);
assertNotNull(catalogBaseTable);
}
@Test
public void testTableExists() {
ObjectPath objectPath = new ObjectPath("default", "test");
boolean exists = rocketMQCatalog.tableExists(objectPath);
assertTrue(exists);
}
@Test(expected = UnsupportedOperationException.class)
public void testCreateTable() throws TableAlreadyExistException, DatabaseNotExistException {
rocketMQCatalog.createTable(null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testDropTable() throws TableNotExistException {
rocketMQCatalog.dropTable(null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testListFunctions() throws DatabaseNotExistException {
rocketMQCatalog.listFunctions("default");
}
@Test(expected = FunctionNotExistException.class)
public void testGetFunction() throws FunctionNotExistException {
ObjectPath objectPath = new ObjectPath("default", "test");
rocketMQCatalog.getFunction(objectPath);
}
@Test(expected = UnsupportedOperationException.class)
public void testFunctionExists() {
boolean exists = rocketMQCatalog.functionExists(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testCreateFunction()
throws FunctionAlreadyExistException, DatabaseNotExistException {
rocketMQCatalog.createFunction(null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testAlterFunction() throws FunctionNotExistException {
rocketMQCatalog.alterFunction(null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testDropFunction() throws FunctionNotExistException {
rocketMQCatalog.dropFunction(null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testAlterDatabase() throws DatabaseNotExistException {
rocketMQCatalog.alterDatabase(null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testListViews() throws DatabaseNotExistException {
rocketMQCatalog.listViews("default");
}
@Test(expected = UnsupportedOperationException.class)
public void testAlterTable() throws TableNotExistException {
rocketMQCatalog.alterTable(null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testRenameTable() throws TableAlreadyExistException, TableNotExistException {
rocketMQCatalog.renameTable(null, null, false);
}
@Test
public void testListPartitions() throws TableNotPartitionedException, TableNotExistException {
List<CatalogPartitionSpec> catalogPartitionSpecs =
rocketMQCatalog.listPartitions(new ObjectPath("default", "test"));
assertEquals(2, catalogPartitionSpecs.size());
assertEquals(
new ArrayList<CatalogPartitionSpec>() {
{
add(
new CatalogPartitionSpec(
new HashMap<String, String>(1) {
{
put("__queue_id__", String.valueOf(0));
}
}));
add(
new CatalogPartitionSpec(
new HashMap<String, String>(1) {
{
put("__queue_id__", String.valueOf(1));
}
}));
}
},
catalogPartitionSpecs);
}
@Test(expected = UnsupportedOperationException.class)
public void testListPartitionsByFilter()
throws TableNotPartitionedException, TableNotExistException {
rocketMQCatalog.listPartitionsByFilter(null, null);
}
@Test
public void testGetPartition() throws PartitionNotExistException {
ObjectPath objectPath = new ObjectPath("default", "test");
CatalogPartition partition =
rocketMQCatalog.getPartition(
objectPath,
new CatalogPartitionSpec(
new HashMap<String, String>(1) {
{
put("__queue_id__", String.valueOf(0));
}
}));
assertEquals(
new HashMap<String, String>(1) {
{
put("__queue_id__", String.valueOf(0));
}
},
partition.getProperties());
}
@Test
public void testPartitionExists() {
ObjectPath objectPath = new ObjectPath("default", "test");
boolean test =
rocketMQCatalog.partitionExists(
objectPath,
new CatalogPartitionSpec(
new HashMap<String, String>(1) {
{
put("__queue_id__", String.valueOf(0));
}
}));
assertNotNull(test);
}
@Test(expected = UnsupportedOperationException.class)
public void testCreatePartition()
throws TableNotPartitionedException, TableNotExistException,
PartitionSpecInvalidException, PartitionAlreadyExistsException {
rocketMQCatalog.createPartition(null, null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testDropPartition() throws PartitionNotExistException {
rocketMQCatalog.dropPartition(null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testAlterPartition() throws PartitionNotExistException {
rocketMQCatalog.alterPartition(null, null, null, false);
}
@Test
public void testGetTableStatistics() throws TableNotExistException {
CatalogTableStatistics statistics = rocketMQCatalog.getTableStatistics(null);
assertEquals(statistics, CatalogTableStatistics.UNKNOWN);
}
@Test
public void testGetTableColumnStatistics() throws TableNotExistException {
CatalogColumnStatistics statistics = rocketMQCatalog.getTableColumnStatistics(null);
assertEquals(statistics, CatalogColumnStatistics.UNKNOWN);
}
@Test
public void testGetPartitionStatistics() throws PartitionNotExistException {
CatalogTableStatistics statistics = rocketMQCatalog.getPartitionStatistics(null, null);
assertEquals(statistics, CatalogTableStatistics.UNKNOWN);
}
@Test
public void testGetPartitionColumnStatistics() throws PartitionNotExistException {
CatalogColumnStatistics statistics =
rocketMQCatalog.getPartitionColumnStatistics(null, null);
assertEquals(statistics, CatalogColumnStatistics.UNKNOWN);
}
@Test(expected = UnsupportedOperationException.class)
public void testAlterTableStatistics() throws TableNotExistException {
rocketMQCatalog.alterTableStatistics(null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testAlterTableColumnStatistics() throws TableNotExistException {
rocketMQCatalog.alterTableColumnStatistics(null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testAlterPartitionStatistics() throws PartitionNotExistException {
rocketMQCatalog.alterPartitionStatistics(null, null, null, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testAlterPartitionColumnStatistics() throws PartitionNotExistException {
rocketMQCatalog.alterPartitionColumnStatistics(null, null, null, false);
}
}