blob: b2264bddec183d41bea2a4c30b6c6839251c84a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.admin;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl;
import org.apache.rocketmq.dashboard.service.client.MQAdminInstance;
import org.apache.rocketmq.dashboard.util.MockObjectUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
public class MQAdminExtImplTest {
@InjectMocks
private MQAdminExtImpl mqAdminExtImpl;
@Mock
private DefaultMQAdminExt defaultMQAdminExt;
@Mock
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
@Mock
private MQClientInstance mqClientInstance;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private RemotingClient remotingClient;
private String brokerAddr = "127.0.0.1:10911";
@Before
public void init() throws Exception {
Field field = MQAdminInstance.class.getDeclaredField("MQ_ADMIN_EXT_THREAD_LOCAL");
field.setAccessible(true);
Object object = field.get(mqAdminExtImpl);
assertNotNull(object);
ThreadLocal<DefaultMQAdminExt> threadLocal = (ThreadLocal<DefaultMQAdminExt>) object;
defaultMQAdminExt = mock(DefaultMQAdminExt.class);
threadLocal.set(defaultMQAdminExt);
ReflectionTestUtils.setField(defaultMQAdminExt, "defaultMQAdminExtImpl", defaultMQAdminExtImpl);
ReflectionTestUtils.setField(defaultMQAdminExtImpl, "mqClientInstance", mqClientInstance);
ReflectionTestUtils.setField(mqClientInstance, "mQClientAPIImpl", mQClientAPIImpl);
ReflectionTestUtils.setField(mQClientAPIImpl, "remotingClient", remotingClient);
}
@Test
public void testUpdateBrokerConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
doNothing()
.doThrow(new MQBrokerException(0, ""))
.when(defaultMQAdminExt).updateBrokerConfig(anyString(), any());
mqAdminExtImpl.updateBrokerConfig(brokerAddr, new Properties());
boolean hasException = false;
try {
mqAdminExtImpl.updateBrokerConfig(brokerAddr, new Properties());
} catch (Exception e) {
hasException = true;
assertThat(e).isInstanceOf(MQBrokerException.class);
assertThat(((MQBrokerException) e).getResponseCode()).isEqualTo(0);
}
assertTrue(hasException);
}
@Test
public void testCreateAndUpdateTopicConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.createAndUpdateTopicConfig(brokerAddr, new TopicConfig());
}
@Test
public void testDeletePlainAccessConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.deletePlainAccessConfig(brokerAddr, "rocketmq");
}
@Test
public void testUpdateGlobalWhiteAddrConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.updateGlobalWhiteAddrConfig(brokerAddr, "192.168.*.*");
}
@Test
public void testCreateAndUpdatePlainAccessConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.createAndUpdatePlainAccessConfig(brokerAddr, new PlainAccessConfig());
}
@Test
public void testExamineBrokerClusterAclVersionInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
assertNull(mqAdminExtImpl.examineBrokerClusterAclVersionInfo(brokerAddr));
}
@Test
public void testExamineBrokerClusterAclConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
assertNull(mqAdminExtImpl.examineBrokerClusterAclConfig(brokerAddr));
}
@Test
public void testQueryConsumerStatus() throws Exception {
assertNotNull(mqAdminExtImpl);
}
@Test
public void testCreateAndUpdateSubscriptionGroupConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
mqAdminExtImpl.createAndUpdateSubscriptionGroupConfig(brokerAddr, new SubscriptionGroupConfig());
}
@Test
public void testExamineSubscriptionGroupConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
RemotingCommand response1 = RemotingCommand.createResponseCommand(null);
RemotingCommand response2 = RemotingCommand.createResponseCommand(null);
response2.setCode(ResponseCode.SUCCESS);
response2.setBody(RemotingSerializable.encode(MockObjectUtil.createSubscriptionGroupWrapper()));
when(remotingClient.invokeSync(anyString(), any(), anyLong()))
.thenThrow(new RuntimeException("invokeSync exception"))
.thenReturn(response1).thenReturn(response2);
}
// invokeSync exception
try {
mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "topic_test");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "invokeSync exception");
}
// responseCode is not success
try {
mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "group_test");
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(MQBrokerException.class);
assertThat(((MQBrokerException) e.getCause()).getResponseCode()).isEqualTo(1);
}
// GET_ALL_SUBSCRIPTIONGROUP_CONFIG success
SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "group_test");
Assert.assertEquals(subscriptionGroupConfig.getGroupName(), "group_test");
}
@Test
public void testExamineTopicConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
RemotingCommand response1 = RemotingCommand.createResponseCommand(null);
RemotingCommand response2 = RemotingCommand.createResponseCommand(null);
response2.setCode(ResponseCode.SUCCESS);
response2.setBody(RemotingSerializable.encode(MockObjectUtil.createTopicConfigWrapper()));
when(remotingClient.invokeSync(anyString(), any(), anyLong()))
.thenThrow(new RuntimeException("invokeSync exception"))
.thenReturn(response1).thenReturn(response2);
}
// invokeSync exception
try {
mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "invokeSync exception");
}
// responseCode is not success
try {
mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(MQBrokerException.class);
assertThat(((MQBrokerException) e.getCause()).getResponseCode()).isEqualTo(1);
}
// GET_ALL_TOPIC_CONFIG success
TopicConfig topicConfig = mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
Assert.assertEquals(topicConfig.getTopicName(), "topic_test");
}
@Test
public void testExamineTopicStats() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineTopicStats(anyString())).thenReturn(MockObjectUtil.createTopicStatsTable());
}
TopicStatsTable topicStatsTable = mqAdminExtImpl.examineTopicStats("topic_test");
Assert.assertNotNull(topicStatsTable);
Assert.assertEquals(topicStatsTable.getOffsetTable().size(), 1);
}
@Test
public void testExamineAllTopicConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
}
@Test
public void testFetchAllTopicList() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchAllTopicList()).thenReturn(new TopicList());
}
TopicList topicList = mqAdminExtImpl.fetchAllTopicList();
Assert.assertNotNull(topicList);
}
@Test
public void testFetchBrokerRuntimeStats() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchBrokerRuntimeStats(anyString())).thenReturn(new KVTable());
}
KVTable kvTable = mqAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr);
Assert.assertNotNull(kvTable);
}
@Test
public void testExamineConsumeStats() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineConsumeStats(anyString())).thenReturn(MockObjectUtil.createConsumeStats());
when(defaultMQAdminExt.examineConsumeStats(anyString(), anyString())).thenReturn(MockObjectUtil.createConsumeStats());
}
ConsumeStats consumeStats = mqAdminExtImpl.examineConsumeStats("group_test");
ConsumeStats consumeStatsWithTopic = mqAdminExtImpl.examineConsumeStats("group_test", "topic_test");
Assert.assertNotNull(consumeStats);
Assert.assertEquals(consumeStats.getOffsetTable().size(), 2);
Assert.assertNotNull(consumeStatsWithTopic);
Assert.assertEquals(consumeStatsWithTopic.getOffsetTable().size(), 2);
}
@Test
public void testExamineBrokerClusterInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineBrokerClusterInfo()).thenReturn(MockObjectUtil.createClusterInfo());
}
ClusterInfo clusterInfo = mqAdminExtImpl.examineBrokerClusterInfo();
Assert.assertNotNull(clusterInfo);
Assert.assertEquals(clusterInfo.getBrokerAddrTable().size(), 1);
Assert.assertEquals(clusterInfo.getClusterAddrTable().size(), 1);
}
@Test
public void testExamineTopicRouteInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineTopicRouteInfo(anyString())).thenReturn(MockObjectUtil.createTopicRouteData());
}
TopicRouteData topicRouteData = mqAdminExtImpl.examineTopicRouteInfo("topic_test");
Assert.assertNotNull(topicRouteData);
}
@Test
public void testExamineConsumerConnectionInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(new ConsumerConnection());
}
ConsumerConnection consumerConnection = mqAdminExtImpl.examineConsumerConnectionInfo("group_test");
Assert.assertNotNull(consumerConnection);
}
@Test
public void testExamineProducerConnectionInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.examineProducerConnectionInfo(anyString(), anyString())).thenReturn(new ProducerConnection());
}
ProducerConnection producerConnection = mqAdminExtImpl.examineProducerConnectionInfo("group_test", "topic_test");
Assert.assertNotNull(producerConnection);
}
@Test
public void testGetNameServerAddressList() {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getNameServerAddressList()).thenReturn(Lists.asList("127.0.0.1:9876", new String[] {"127.0.0.2:9876"}));
}
List<String> list = mqAdminExtImpl.getNameServerAddressList();
Assert.assertEquals(list.size(), 2);
}
@Test
public void testWipeWritePermOfBroker() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.wipeWritePermOfBroker(anyString(), anyString())).thenReturn(6);
}
int result = mqAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "broker-a");
Assert.assertEquals(result, 6);
}
@Test
public void testPutKVConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).putKVConfig(anyString(), anyString(), anyString());
}
mqAdminExtImpl.putKVConfig("namespace", "key", "value");
}
@Test
public void testGetKVConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getKVConfig(anyString(), anyString())).thenReturn("value");
}
String value = mqAdminExtImpl.getKVConfig("namespace", "key");
Assert.assertEquals(value, "value");
}
@Test
public void testGetKVListByNamespace() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getKVListByNamespace(anyString())).thenReturn(new KVTable());
}
KVTable kvTable = mqAdminExtImpl.getKVListByNamespace("namespace");
Assert.assertNotNull(kvTable);
}
@Test
public void testDeleteTopicInBroker() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).deleteTopicInBroker(any(), anyString());
}
mqAdminExtImpl.deleteTopicInBroker(Sets.newHashSet("127.0.0.1:10911"), "topic_test");
}
@Test
public void testDeleteTopicInNameServer() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).deleteTopicInNameServer(any(), anyString());
}
mqAdminExtImpl.deleteTopicInNameServer(Sets.newHashSet("127.0.0.1:9876", "127.0.0.2:9876"), "topic_test");
}
@Test
public void testDeleteSubscriptionGroup() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).deleteSubscriptionGroup(anyString(), anyString());
doNothing().when(defaultMQAdminExt).deleteSubscriptionGroup(anyString(), anyString(), anyBoolean());
}
mqAdminExtImpl.deleteSubscriptionGroup(brokerAddr, "group_test");
mqAdminExtImpl.deleteSubscriptionGroup(brokerAddr, "group_test", true);
}
@Test
public void testCreateAndUpdateKvConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).createAndUpdateKvConfig(anyString(), anyString(), anyString());
}
mqAdminExtImpl.createAndUpdateKvConfig("namespace", "key", "value");
}
@Test
public void testDeleteKvConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).deleteKvConfig(anyString(), anyString());
}
mqAdminExtImpl.deleteKvConfig("namespace", "key");
}
@Test
public void testDeleteConsumerOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
}
@Test
public void testResetOffsetByTimestampOld() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.resetOffsetByTimestampOld(anyString(), anyString(), anyLong(), anyBoolean())).thenReturn(new ArrayList<RollbackStats>());
}
List<RollbackStats> stats = mqAdminExtImpl.resetOffsetByTimestampOld("group_test", "topic_test", 1628495765398L, false);
Assert.assertNotNull(stats);
}
@Test
public void testResetOffsetByTimestamp() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.resetOffsetByTimestamp(anyString(), anyString(), anyLong(), anyBoolean())).thenReturn(new HashMap<MessageQueue, Long>());
}
Map<MessageQueue, Long> map = mqAdminExtImpl.resetOffsetByTimestamp("group_test", "topic_test", 1628495765398L, false);
Assert.assertNotNull(map);
}
@Test
public void testResetOffsetNew() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).resetOffsetNew(anyString(), anyString(), anyLong());
}
mqAdminExtImpl.resetOffsetNew("group_test", "topic_test", 1628495765398L);
}
@Test
public void testGetConsumeStatus() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getConsumeStatus(anyString(), anyString(), anyString())).thenReturn(new HashMap<String, Map<MessageQueue, Long>>());
}
mqAdminExtImpl.getConsumeStatus("topic_test", "group_test", "");
}
@Test
public void testCreateOrUpdateOrderConf() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).createOrUpdateOrderConf(anyString(), anyString(), anyBoolean());
}
mqAdminExtImpl.createOrUpdateOrderConf("key", "value", false);
}
@Test
public void testQueryTopicConsumeByWho() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(new GroupList());
}
GroupList groupList = mqAdminExtImpl.queryTopicConsumeByWho("topic_test");
Assert.assertNotNull(groupList);
}
@Test
public void testCleanExpiredConsumerQueue() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.cleanExpiredConsumerQueue(anyString())).thenReturn(true);
}
boolean result = mqAdminExtImpl.cleanExpiredConsumerQueue("DefaultCluster");
Assert.assertEquals(result, true);
}
@Test
public void testCleanExpiredConsumerQueueByAddr() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.cleanExpiredConsumerQueueByAddr(anyString())).thenReturn(true);
}
boolean result = mqAdminExtImpl.cleanExpiredConsumerQueueByAddr("DefaultCluster");
Assert.assertEquals(result, true);
}
@Test
public void testGetConsumerRunningInfo() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean())).thenReturn(new ConsumerRunningInfo());
}
ConsumerRunningInfo consumerRunningInfo = mqAdminExtImpl.getConsumerRunningInfo("group_test", "", true);
Assert.assertNotNull(consumerRunningInfo);
}
@Test
public void testConsumeMessageDirectly() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.consumeMessageDirectly(anyString(), anyString(), anyString())).thenReturn(new ConsumeMessageDirectlyResult());
when(defaultMQAdminExt.consumeMessageDirectly(anyString(), anyString(), anyString(), anyString())).thenReturn(new ConsumeMessageDirectlyResult());
}
ConsumeMessageDirectlyResult result1 = mqAdminExtImpl.consumeMessageDirectly("group_test", "", "7F000001ACC018B4AAC2116AF6500000");
ConsumeMessageDirectlyResult result2 = mqAdminExtImpl.consumeMessageDirectly("group_test", "", "topic_test", "7F000001ACC018B4AAC2116AF6500000");
Assert.assertNotNull(result1);
Assert.assertNotNull(result2);
}
@Test
public void testMessageTrackDetail() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.messageTrackDetail(any())).thenReturn(new ArrayList<MessageTrack>());
}
List<MessageTrack> tracks = mqAdminExtImpl.messageTrackDetail(new MessageExt());
Assert.assertNotNull(tracks);
}
@Test
public void testCloneGroupOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).cloneGroupOffset(anyString(), anyString(), anyString(), anyBoolean());
}
mqAdminExtImpl.cloneGroupOffset("group_test", "group_test1", "topic_test", false);
}
@Test
public void testCreateTopic() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyMap());
doNothing().when(defaultMQAdminExt).createTopic(anyString(), anyString(), anyInt(), anyInt(), anyMap());
}
Map<String, String> map = new HashMap<>();
map.put("message.type", "FIFO");
mqAdminExtImpl.createTopic("key", "topic_test", 8, map);
mqAdminExtImpl.createTopic("key", "topic_test", 8, 1, map);
}
@Test
public void testSearchOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.searchOffset(any(), anyLong())).thenReturn(Long.MAX_VALUE);
}
long offset = mqAdminExtImpl.searchOffset(new MessageQueue(), 1628495765398L);
Assert.assertEquals(offset, Long.MAX_VALUE);
}
@Test
public void testMaxOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.maxOffset(any())).thenReturn(Long.MAX_VALUE);
}
long offset = mqAdminExtImpl.maxOffset(new MessageQueue());
Assert.assertEquals(offset, Long.MAX_VALUE);
}
@Test
public void testMinOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.minOffset(any())).thenReturn(Long.MIN_VALUE);
}
long offset = mqAdminExtImpl.minOffset(new MessageQueue());
Assert.assertEquals(offset, Long.MIN_VALUE);
}
@Test
public void testEarliestMsgStoreTime() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.earliestMsgStoreTime(any())).thenReturn(1628495765398L);
}
long storeTime = mqAdminExtImpl.earliestMsgStoreTime(new MessageQueue());
Assert.assertEquals(storeTime, 1628495765398L);
}
@Test
public void testViewMessage() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.viewMessage(anyString())).thenReturn(new MessageExt());
}
MessageExt messageExt = mqAdminExtImpl.viewMessage("7F000001ACC018B4AAC2116AF6500000");
Assert.assertNotNull(messageExt);
}
@Test
public void testQueryMessage() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.queryMessage(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(mock(QueryResult.class));
}
QueryResult result = mqAdminExtImpl.queryMessage("topic_test", "key", 32, 1627804565000L, System.currentTimeMillis());
Assert.assertNotNull(result);
}
@Test
public void testStart() {
assertNotNull(mqAdminExtImpl);
try {
mqAdminExtImpl.start();
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalStateException);
}
}
@Test
public void testShutdown() {
assertNotNull(mqAdminExtImpl);
try {
mqAdminExtImpl.shutdown();
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalStateException);
}
}
@Test
public void testQueryConsumeTimeSpan() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.queryConsumeTimeSpan(anyString(), anyString())).thenReturn(new ArrayList<QueueTimeSpan>());
}
List<QueueTimeSpan> timeSpans = mqAdminExtImpl.queryConsumeTimeSpan("topic_test", "group_test");
Assert.assertNotNull(timeSpans);
}
@Test
public void testViewMessage2() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl()).thenReturn(mock(MQAdminImpl.class));
when(defaultMQAdminExt.viewMessage(anyString())).thenThrow(new RuntimeException("viewMessage exception"));
}
mqAdminExtImpl.viewMessage("topic_test", "7F000001ACC018B4AAC2116AF6500000");
}
@Test
public void testGetBrokerConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getBrokerConfig(anyString())).thenReturn(new Properties());
}
Properties brokerConfig = mqAdminExtImpl.getBrokerConfig(brokerAddr);
Assert.assertNotNull(brokerConfig);
}
@Test
public void testFetchTopicsByCLuster() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchTopicsByCLuster(anyString())).thenReturn(new TopicList());
}
TopicList topicList = mqAdminExtImpl.fetchTopicsByCLuster("DefaultCluster");
Assert.assertNotNull(topicList);
}
@Test
public void testCleanUnusedTopic() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.cleanUnusedTopic(anyString())).thenReturn(true);
when(defaultMQAdminExt.cleanUnusedTopicByAddr(anyString())).thenReturn(true);
}
Boolean result1 = mqAdminExtImpl.cleanUnusedTopic("DefaultCluster");
Boolean result2 = mqAdminExtImpl.cleanUnusedTopic(brokerAddr);
Assert.assertEquals(result1, true);
Assert.assertEquals(result2, true);
}
@Test
public void testViewBrokerStatsData() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.viewBrokerStatsData(anyString(), anyString(), anyString())).thenReturn(new BrokerStatsData());
}
BrokerStatsData brokerStatsData = mqAdminExtImpl.viewBrokerStatsData(brokerAddr, BrokerStatsManager.TOPIC_PUT_NUMS, "topic_test");
Assert.assertNotNull(brokerStatsData);
}
@Test
public void testGetClusterList() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getClusterList(anyString())).thenReturn(new HashSet<>());
}
Set<String> clusterList = mqAdminExtImpl.getClusterList("topic_test");
Assert.assertNotNull(clusterList);
}
@Test
public void testFetchConsumeStatsInBroker() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchConsumeStatsInBroker(anyString(), anyBoolean(), anyLong())).thenReturn(new ConsumeStatsList());
}
ConsumeStatsList consumeStatsList = mqAdminExtImpl.fetchConsumeStatsInBroker(brokerAddr, false, System.currentTimeMillis());
Assert.assertNotNull(consumeStatsList);
}
@Test
public void testGetTopicClusterList() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.fetchTopicsByCLuster(anyString())).thenReturn(new TopicList());
}
TopicList topicList = mqAdminExtImpl.fetchTopicsByCLuster("DefaultCluster");
Assert.assertNotNull(topicList);
}
@Test
public void testGetAllSubscriptionGroup() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(new SubscriptionGroupWrapper());
}
SubscriptionGroupWrapper wrapper = mqAdminExtImpl.getAllSubscriptionGroup(brokerAddr, 5000L);
Assert.assertNotNull(wrapper);
}
@Test
public void testUpdateConsumeOffset() throws Exception {
assertNotNull(mqAdminExtImpl);
{
doNothing().when(defaultMQAdminExt).updateConsumeOffset(anyString(), anyString(), any(), anyLong());
}
mqAdminExtImpl.updateConsumeOffset(brokerAddr, "group_test", new MessageQueue(), 10000L);
}
@Test
public void testUpdateNameServerConfig() {
assertNotNull(mqAdminExtImpl);
}
@Test
public void testGetNameServerConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
assertNull(mqAdminExtImpl.getNameServerConfig(new ArrayList<>()));
}
@Test
public void testQueryConsumeQueue() throws Exception {
assertNotNull(mqAdminExtImpl);
assertNull(mqAdminExtImpl.queryConsumeQueue(brokerAddr, "topic_test", 2, 1, 10, "group_test"));
}
@Test
public void testResumeCheckHalfMessage() throws Exception {
assertNotNull(mqAdminExtImpl);
Assert.assertFalse(mqAdminExtImpl.resumeCheckHalfMessage("7F000001ACC018B4AAC2116AF6500000"));
Assert.assertFalse(mqAdminExtImpl.resumeCheckHalfMessage("topic_test", "7F000001ACC018B4AAC2116AF6500000"));
}
@Test
public void testAddWritePermOfBroker() throws Exception {
assertNotNull(mqAdminExtImpl);
{
when(defaultMQAdminExt.addWritePermOfBroker(anyString(), anyString())).thenReturn(6);
}
Assert.assertEquals(mqAdminExtImpl.addWritePermOfBroker("127.0.0.1:9876", "broker-a"), 6);
}
@Test
public void testGetUserSubscriptionGroup() throws Exception {
assertNotNull(mqAdminExtImpl);
SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
{
when(defaultMQAdminExt.getUserSubscriptionGroup(anyString(), anyLong())).thenReturn(wrapper);
}
Assert.assertEquals(mqAdminExtImpl.getUserSubscriptionGroup("127.0.0.1:10911", 3000), wrapper);
}
@Test
public void testGetAllTopicConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
TopicConfigSerializeWrapper wrapper = new TopicConfigSerializeWrapper();
{
when(defaultMQAdminExt.getAllTopicConfig(anyString(), anyLong())).thenReturn(wrapper);
}
Assert.assertEquals(mqAdminExtImpl.getAllTopicConfig("127.0.0.1:10911", 3000), wrapper);
}
@Test
public void testGetUserTopicConfig() throws Exception {
assertNotNull(mqAdminExtImpl);
TopicConfigSerializeWrapper wrapper = new TopicConfigSerializeWrapper();
{
when(defaultMQAdminExt.getUserTopicConfig(anyString(), anyBoolean(), anyLong())).thenReturn(wrapper);
}
Assert.assertEquals(mqAdminExtImpl.getUserTopicConfig("127.0.0.1:10911", true, 3000), wrapper);
}
}