blob: b95e80aceac4da6757ade87c3dda721ad7116a28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.controller;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
import org.apache.rocketmq.dashboard.service.impl.ConsumerServiceImpl;
import org.apache.rocketmq.dashboard.util.MockObjectUtil;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Spy;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public class ConsumerControllerTest extends BaseControllerTest {
@InjectMocks
private ConsumerController consumerController;
@Spy
private ConsumerServiceImpl consumerService;
@Before
public void init() throws Exception {
consumerService.afterPropertiesSet();
super.mockRmqConfigure();
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
SubscriptionGroupWrapper wrapper = MockObjectUtil.createSubscriptionGroupWrapper();
when(mqAdminExt.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(wrapper);
ConsumeStats stats = MockObjectUtil.createConsumeStats();
when(mqAdminExt.examineConsumeStats(anyString())).thenReturn(stats);
when(mqAdminExt.examineConsumeStats(anyString(), isNull())).thenReturn(stats);
ConsumerConnection connection = MockObjectUtil.createConsumerConnection();
when(mqAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(connection);
ConsumerRunningInfo runningInfo = MockObjectUtil.createConsumerRunningInfo();
when(mqAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean()))
.thenReturn(runningInfo);
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
config.setGroupName("group-test");
when(mqAdminExt.examineSubscriptionGroupConfig(anyString(), anyString()))
.thenReturn(config);
}
@Test
public void testList() throws Exception {
final String url = "/consumer/groupList.query";
requestBuilder = MockMvcRequestBuilders.get(url);
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(2)))
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
// executorService shutdown
consumerService.destroy();
}
@Test
public void testGroupQuery() throws Exception {
final String url = "/consumer/group.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.group").value("group_test"))
.andExpect(jsonPath("$.data.consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
.andExpect(jsonPath("$.data.messageModel").value(MessageModel.CLUSTERING.name()));
}
@Test
public void testSkipAccumulate() throws Exception {
final String url = "/consumer/skipAccumulate.do";
resetOffsetOrSkipAccumulate(url, -1L);
}
@Test
public void testResetOffset() throws Exception {
final String url = "/consumer/resetOffset.do";
resetOffsetOrSkipAccumulate(url, System.currentTimeMillis());
}
private void resetOffsetOrSkipAccumulate(String url, Long resetTime) throws Exception {
RollbackStats rollbackStats = new RollbackStats();
rollbackStats.setRollbackOffset(10L);
rollbackStats.setQueueId(5L);
rollbackStats.setBrokerName("broker-a");
Map<MessageQueue, Long> rollbackStatsMap = new HashMap<>(0);
rollbackStatsMap.put(new MessageQueue("topic_test", "broker-a", 5), 10L);
{
MQClientException exception = new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "不在线");
when(mqAdminExt.resetOffsetByTimestamp(anyString(), anyString(), anyLong(), anyBoolean()))
.thenReturn(rollbackStatsMap).thenThrow(exception);
when(mqAdminExt.resetOffsetByTimestampOld(anyString(), anyString(), anyLong(), anyBoolean()))
.thenReturn(Lists.newArrayList(rollbackStats));
}
ResetOffsetRequest request = new ResetOffsetRequest();
String groupId = "group_test";
request.setTopic("topic_test");
request.setResetTime(resetTime);
request.setConsumerGroupList(Lists.newArrayList(groupId));
// 1、consumer not online
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
requestBuilder.content(JSON.toJSONString(request));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").isMap())
.andExpect(jsonPath("$.data." + groupId + ".rollbackStatsList").isArray())
.andExpect(jsonPath("$.data." + groupId + ".rollbackStatsList[0].rollbackOffset").value(10L));
// 2、consumer not online
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk()).andExpect(jsonPath("$.data").isMap());
}
@Test
public void testFetchBrokerNameList() throws Exception {
final String url = "/consumer/fetchBrokerNameList.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(1)))
.andExpect(jsonPath("$.data[0]").value("broker-a"));
}
@Test
public void testExamineSubscriptionGroupConfig() throws Exception {
final String url = "/consumer/examineSubscriptionGroupConfig.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(1)));
}
@Test
public void testDelete() throws Exception {
final String url = "/consumer/deleteSubGroup.do";
{
doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString());
doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString());
}
DeleteSubGroupRequest request = new DeleteSubGroupRequest();
request.setBrokerNameList(Lists.newArrayList("broker-a"));
request.setGroupName("group_test");
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
requestBuilder.content(JSON.toJSONString(request));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
}
@Test
public void testCreateOrUpdate() throws Exception {
final String url = "/consumer/createOrUpdate.do";
// 1、clusterName and brokerName all blank
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
ConsumerConfigInfo consumerConfigInfo = new ConsumerConfigInfo();
requestBuilder.content(JSON.toJSONString(consumerConfigInfo));
perform = mockMvc.perform(requestBuilder);
performErrorExpect(perform);
{
doNothing().when(mqAdminExt).createAndUpdateSubscriptionGroupConfig(anyString(), any());
}
List<String> clusterNameList = Lists.newArrayList("DefaultCluster");
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
config.setGroupName("group_test");
consumerConfigInfo.setClusterNameList(clusterNameList);
consumerConfigInfo.setSubscriptionGroupConfig(config);
// 2、create consumer
requestBuilder = MockMvcRequestBuilders.post(url);
requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8);
requestBuilder.content(JSON.toJSONString(consumerConfigInfo));
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data").value(true));
}
@Test
public void testQueryConsumerByTopic() throws Exception {
final String url = "/consumer/queryTopicByConsumer.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(1)))
.andExpect(jsonPath("$.data[0].queueStatInfoList", hasSize(2)));
}
@Test
public void testConsumerConnection() throws Exception {
final String url = "/consumer/consumerConnection.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
.andExpect(jsonPath("$.data.messageModel").value(MessageModel.CLUSTERING.name()));
}
@Test
public void testGetConsumerRunningInfo() throws Exception {
final String url = "/consumer/consumerRunningInfo.query";
requestBuilder = MockMvcRequestBuilders.get(url);
requestBuilder.param("consumerGroup", "group_test");
requestBuilder.param("clientId", "group_test");
requestBuilder.param("jstack", "true");
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data.jstack").value("test"));
}
@Override protected Object getTestController() {
return consumerController;
}
}