blob: 7e50c560936de41426a65e823acb54d2be4fc441 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.controller;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.service.impl.ConsumerServiceImpl;
import org.apache.rocketmq.dashboard.service.impl.TopicServiceImpl;
import org.apache.rocketmq.dashboard.util.MockObjectUtil;
import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Spy;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public class TopicControllerTest extends BaseControllerTest {
@InjectMocks
private TopicController topicController;
@Spy
private TopicServiceImpl topicService;
@Spy
private ConsumerServiceImpl consumerService;
private String topicName = "topic_test";
@Before
public void init() {
super.mockRmqConfigure();
}
@Test
public void testList() throws Exception {
{
// mock all topics
TopicList topicList = new TopicList();
Set<String> topicSet = new HashSet<>();
topicSet.add("common_topic1");
topicSet.add("common_topic2");
topicSet.add("system_topic1");
topicSet.add("system_topic2");
topicSet.add("%DLQ%topic");
topicSet.add("%RETRY%topic");
topicList.setTopicList(topicSet);
when(mqAdminExt.fetchAllTopicList()).thenReturn(topicList);
// mock system topics
TopicList sysTopicList = new TopicList();
Set<String> sysTopicSet = new HashSet<>();
sysTopicSet.add("system_topic1");
sysTopicSet.add("system_topic2");
sysTopicList.setTopicList(sysTopicSet);
DefaultMQProducer producer = mock(DefaultMQProducer.class);
doNothing().when(producer).start();
doNothing().when(producer).shutdown();
DefaultMQProducerImpl defaultMQProducer = mock(DefaultMQProducerImpl.class);
MQClientInstance mqClientInstance = mock(MQClientInstance.class);
MQClientAPIImpl mqClientAPIImpl = mock(MQClientAPIImpl.class);
when(producer.getDefaultMQProducerImpl()).thenReturn(defaultMQProducer);
when(defaultMQProducer.getmQClientFactory()).thenReturn(mqClientInstance);
when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPIImpl);
when(mqClientAPIImpl.getSystemTopicList(anyLong())).thenReturn(sysTopicList);
doReturn(producer).when(topicService).buildDefaultMQProducer(anyString(), any(), anyBoolean());
}
final String url = "/topic/list.query";
// 1、list all topic
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("skipSysProcess", String.valueOf(true));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.topicList", hasSize(6)));
// 2、list all topic filter DLQ and Retry topic
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("skipSysProcess", String.valueOf(false));
requestBuilder.param("skipRetryAndDlq", String.valueOf(true));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.topicList", hasSize(4)));
// 3、filter system topic
requestBuilder = MockMvcRequestBuilders.get(url);
perform = mockMvc.perform(requestBuilder);
String[] topicString = {"%SYS%system_topic2","common_topic2","%SYS%system_topic1","common_topic1"};
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.topicList").value(containsInAnyOrder(topicString)));
}
@Test
public void testStat() throws Exception {
{
TopicStatsTable topicStatsTable = MockObjectUtil.createTopicStatsTable();
when(mqAdminExt.examineTopicStats(anyString())).thenReturn(topicStatsTable);
}
final String url = "/topic/stats.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("topic", topicName);
perform = mockMvc.perform(requestBuilder);
performOkExpect(perform);
}
@Test
public void testRoute() throws Exception {
{
TopicRouteData topicRouteData = MockObjectUtil.createTopicRouteData();
when(mqAdminExt.examineTopicRouteInfo(anyString())).thenReturn(topicRouteData);
}
final String url = "/topic/route.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("topic", topicName);
perform = mockMvc.perform(requestBuilder);
performOkExpect(perform);
}
@Test
public void testCreateOrUpdate() throws Exception {
final String url = "/topic/createOrUpdate.do";
// 1、clusterName and brokerName all blank
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
TopicConfigInfo info = new TopicConfigInfo();
requestBuilder.content(JSON.toJSONString(info));
perform = mockMvc.perform(requestBuilder);
performErrorExpect(perform);
{
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
doNothing().when(mqAdminExt).createAndUpdateTopicConfig(anyString(), any());
}
List<String> clusterNameList = Lists.newArrayList("DefaultCluster");
info.setTopicName("topic_test");
info.setReadQueueNums(4);
info.setWriteQueueNums(4);
info.setPerm(6);
info.setClusterNameList(clusterNameList);
// 2、create topic
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
requestBuilder.content(JSON.toJSONString(info));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
}
@Test
public void testExamineTopicConfig() throws Exception {
final String url = "/topic/examineTopicConfig.query";
{
TopicRouteData topicRouteData = MockObjectUtil.createTopicRouteData();
when(mqAdminExt.examineTopicRouteInfo(anyString())).thenReturn(topicRouteData);
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
when(mqAdminExt.examineTopicConfig(anyString(), anyString())).thenReturn(new TopicConfig(topicName));
}
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("topic", topicName);
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data[0].topicName").value(topicName));
}
@Test
public void testQueryConsumerByTopic() throws Exception {
final String url = "/topic/queryConsumerByTopic.query";
{
GroupList list = new GroupList();
list.setGroupList(Sets.newHashSet("group1"));
when(mqAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(list);
ConsumeStats consumeStats = MockObjectUtil.createConsumeStats();
when(mqAdminExt.examineConsumeStats(anyString(), anyString())).thenReturn(consumeStats);
when(mqAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(new ConsumerConnection());
when(mqAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean())).thenReturn(new ConsumerRunningInfo());
}
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("topic", topicName);
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").isMap());
}
@Test
public void testQueryTopicConsumerInfo() throws Exception {
final String url = "/topic/queryTopicConsumerInfo.query";
{
GroupList list = new GroupList();
list.setGroupList(Sets.newHashSet("group1", "group2", "group3"));
when(mqAdminExt.queryTopicConsumeByWho(anyString())).thenReturn(list);
}
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("topic", topicName);
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.groupList", hasSize(3)));
}
@Test
public void testSendTopicMessage() throws Exception {
final String url = "/topic/sendTopicMessage.do";
{
DefaultMQProducer producer = mock(DefaultMQProducer.class);
doNothing().when(producer).start();
doNothing().when(producer).shutdown();
SendResult result = new SendResult(SendStatus.SEND_OK, "7F000001E41A2E5D6D978B82C20F003D",
"0A8E83C300002A9F00000000000013D3", new MessageQueue(), 1000L);
when(producer.send(any(Message.class))).thenReturn(result);
doReturn(producer).when(topicService).buildDefaultMQProducer(anyString(), any(), anyBoolean());
}
Assert.assertNotNull(topicService.buildDefaultMQProducer("group_test", mock(RPCHook.class)));
SendTopicMessageRequest request = new SendTopicMessageRequest();
request.setTopic(topicName);
request.setMessageBody("hello world");
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
requestBuilder.content(JSON.toJSONString(request));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.sendStatus").value(SendStatus.SEND_OK.name()))
.andExpect(jsonPath("$.data.msgId").value("7F000001E41A2E5D6D978B82C20F003D"));
}
@Test
public void testDelete() throws Exception {
final String url = "/topic/deleteTopic.do";
{
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString());
}
// 1、clusterName is blank
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.param("topic", topicName);
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
// 2、clusterName is not blank
requestBuilder.param("clusterName", "DefaultCluster");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
}
@Test
public void testDeleteTopicByBroker() throws Exception {
{
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
}
final String url = "/topic/deleteTopicByBroker.do";
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.param("topic", topicName);
requestBuilder.param("brokerName", "broker-a");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
}
@Override protected Object getTestController() {
return topicController;
}
}