| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.rocketmq.dashboard.controller; |
| |
| import com.alibaba.fastjson.JSON; |
| import com.google.common.collect.Lists; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.rocketmq.client.exception.MQClientException; |
| import org.apache.rocketmq.common.admin.ConsumeStats; |
| import org.apache.rocketmq.common.admin.RollbackStats; |
| import org.apache.rocketmq.common.message.MessageQueue; |
| import org.apache.rocketmq.common.protocol.ResponseCode; |
| import org.apache.rocketmq.common.protocol.body.ClusterInfo; |
| import org.apache.rocketmq.common.protocol.body.ConsumerConnection; |
| import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; |
| import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; |
| import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; |
| import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; |
| import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; |
| import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo; |
| import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest; |
| import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest; |
| import org.apache.rocketmq.dashboard.service.impl.ConsumerServiceImpl; |
| import org.apache.rocketmq.dashboard.util.MockObjectUtil; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.InjectMocks; |
| import org.mockito.Spy; |
| import org.springframework.http.MediaType; |
| import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; |
| |
| import static org.hamcrest.Matchers.hasSize; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyBoolean; |
| import static org.mockito.ArgumentMatchers.anyLong; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.ArgumentMatchers.isNull; |
| import static org.mockito.Mockito.doNothing; |
| import static org.mockito.Mockito.when; |
| import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; |
| import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
| |
| public class ConsumerControllerTest extends BaseControllerTest { |
| |
| @InjectMocks |
| private ConsumerController consumerController; |
| |
| @Spy |
| private ConsumerServiceImpl consumerService; |
| |
| @Before |
| public void init() throws Exception { |
| consumerService.afterPropertiesSet(); |
| super.mockRmqConfigure(); |
| ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo(); |
| when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo); |
| SubscriptionGroupWrapper wrapper = MockObjectUtil.createSubscriptionGroupWrapper(); |
| when(mqAdminExt.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(wrapper); |
| ConsumeStats stats = MockObjectUtil.createConsumeStats(); |
| when(mqAdminExt.examineConsumeStats(anyString())).thenReturn(stats); |
| when(mqAdminExt.examineConsumeStats(anyString(), isNull())).thenReturn(stats); |
| ConsumerConnection connection = MockObjectUtil.createConsumerConnection(); |
| when(mqAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(connection); |
| ConsumerRunningInfo runningInfo = MockObjectUtil.createConsumerRunningInfo(); |
| when(mqAdminExt.getConsumerRunningInfo(anyString(), anyString(), anyBoolean())) |
| .thenReturn(runningInfo); |
| SubscriptionGroupConfig config = new SubscriptionGroupConfig(); |
| config.setGroupName("group-test"); |
| when(mqAdminExt.examineSubscriptionGroupConfig(anyString(), anyString())) |
| .thenReturn(config); |
| } |
| |
| @Test |
| public void testList() throws Exception { |
| final String url = "/consumer/groupList.query"; |
| requestBuilder = MockMvcRequestBuilders.get(url); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data", hasSize(2))) |
| .andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) |
| .andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name())); |
| // executorService shutdown |
| consumerService.destroy(); |
| } |
| |
| @Test |
| public void testGroupQuery() throws Exception { |
| final String url = "/consumer/group.query"; |
| requestBuilder = MockMvcRequestBuilders.get(url); |
| requestBuilder.param("consumerGroup", "group_test"); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data.group").value("group_test")) |
| .andExpect(jsonPath("$.data.consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) |
| .andExpect(jsonPath("$.data.messageModel").value(MessageModel.CLUSTERING.name())); |
| } |
| |
| @Test |
| public void testSkipAccumulate() throws Exception { |
| final String url = "/consumer/skipAccumulate.do"; |
| resetOffsetOrSkipAccumulate(url, -1L); |
| } |
| |
| @Test |
| public void testResetOffset() throws Exception { |
| final String url = "/consumer/resetOffset.do"; |
| resetOffsetOrSkipAccumulate(url, System.currentTimeMillis()); |
| } |
| |
| private void resetOffsetOrSkipAccumulate(String url, Long resetTime) throws Exception { |
| RollbackStats rollbackStats = new RollbackStats(); |
| rollbackStats.setRollbackOffset(10L); |
| rollbackStats.setQueueId(5L); |
| rollbackStats.setBrokerName("broker-a"); |
| Map<MessageQueue, Long> rollbackStatsMap = new HashMap<>(0); |
| rollbackStatsMap.put(new MessageQueue("topic_test", "broker-a", 5), 10L); |
| { |
| MQClientException exception = new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "不在线"); |
| when(mqAdminExt.resetOffsetByTimestamp(anyString(), anyString(), anyLong(), anyBoolean())) |
| .thenReturn(rollbackStatsMap).thenThrow(exception); |
| when(mqAdminExt.resetOffsetByTimestampOld(anyString(), anyString(), anyLong(), anyBoolean())) |
| .thenReturn(Lists.newArrayList(rollbackStats)); |
| } |
| ResetOffsetRequest request = new ResetOffsetRequest(); |
| String groupId = "group_test"; |
| request.setTopic("topic_test"); |
| request.setResetTime(resetTime); |
| request.setConsumerGroupList(Lists.newArrayList(groupId)); |
| // 1、consumer not online |
| requestBuilder = MockMvcRequestBuilders.post(url); |
| requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); |
| requestBuilder.content(JSON.toJSONString(request)); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data").isMap()) |
| .andExpect(jsonPath("$.data." + groupId + ".rollbackStatsList").isArray()) |
| .andExpect(jsonPath("$.data." + groupId + ".rollbackStatsList[0].rollbackOffset").value(10L)); |
| |
| // 2、consumer not online |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()).andExpect(jsonPath("$.data").isMap()); |
| } |
| |
| @Test |
| public void testFetchBrokerNameList() throws Exception { |
| final String url = "/consumer/fetchBrokerNameList.query"; |
| requestBuilder = MockMvcRequestBuilders.get(url); |
| requestBuilder.param("consumerGroup", "group_test"); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data", hasSize(1))) |
| .andExpect(jsonPath("$.data[0]").value("broker-a")); |
| } |
| |
| @Test |
| public void testExamineSubscriptionGroupConfig() throws Exception { |
| final String url = "/consumer/examineSubscriptionGroupConfig.query"; |
| requestBuilder = MockMvcRequestBuilders.get(url); |
| requestBuilder.param("consumerGroup", "group_test"); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data", hasSize(1))); |
| } |
| |
| @Test |
| public void testDelete() throws Exception { |
| final String url = "/consumer/deleteSubGroup.do"; |
| { |
| doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString()); |
| doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString()); |
| doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString()); |
| } |
| DeleteSubGroupRequest request = new DeleteSubGroupRequest(); |
| request.setBrokerNameList(Lists.newArrayList("broker-a")); |
| request.setGroupName("group_test"); |
| requestBuilder = MockMvcRequestBuilders.post(url); |
| requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); |
| requestBuilder.content(JSON.toJSONString(request)); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data").value(true)); |
| } |
| |
| @Test |
| public void testCreateOrUpdate() throws Exception { |
| final String url = "/consumer/createOrUpdate.do"; |
| // 1、clusterName and brokerName all blank |
| requestBuilder = MockMvcRequestBuilders.post(url); |
| requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); |
| ConsumerConfigInfo consumerConfigInfo = new ConsumerConfigInfo(); |
| requestBuilder.content(JSON.toJSONString(consumerConfigInfo)); |
| perform = mockMvc.perform(requestBuilder); |
| performErrorExpect(perform); |
| |
| { |
| doNothing().when(mqAdminExt).createAndUpdateSubscriptionGroupConfig(anyString(), any()); |
| } |
| |
| List<String> clusterNameList = Lists.newArrayList("DefaultCluster"); |
| SubscriptionGroupConfig config = new SubscriptionGroupConfig(); |
| config.setGroupName("group_test"); |
| consumerConfigInfo.setClusterNameList(clusterNameList); |
| consumerConfigInfo.setSubscriptionGroupConfig(config); |
| // 2、create consumer |
| requestBuilder = MockMvcRequestBuilders.post(url); |
| requestBuilder.contentType(MediaType.APPLICATION_JSON_UTF8); |
| requestBuilder.content(JSON.toJSONString(consumerConfigInfo)); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data").value(true)); |
| } |
| |
| @Test |
| public void testQueryConsumerByTopic() throws Exception { |
| final String url = "/consumer/queryTopicByConsumer.query"; |
| requestBuilder = MockMvcRequestBuilders.get(url); |
| requestBuilder.param("consumerGroup", "group_test"); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data", hasSize(1))) |
| .andExpect(jsonPath("$.data[0].queueStatInfoList", hasSize(2))); |
| } |
| |
| @Test |
| public void testConsumerConnection() throws Exception { |
| final String url = "/consumer/consumerConnection.query"; |
| requestBuilder = MockMvcRequestBuilders.get(url); |
| requestBuilder.param("consumerGroup", "group_test"); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data.consumeType").value(ConsumeType.CONSUME_ACTIVELY.name())) |
| .andExpect(jsonPath("$.data.messageModel").value(MessageModel.CLUSTERING.name())); |
| } |
| |
| @Test |
| public void testGetConsumerRunningInfo() throws Exception { |
| final String url = "/consumer/consumerRunningInfo.query"; |
| requestBuilder = MockMvcRequestBuilders.get(url); |
| requestBuilder.param("consumerGroup", "group_test"); |
| requestBuilder.param("clientId", "group_test"); |
| requestBuilder.param("jstack", "true"); |
| perform = mockMvc.perform(requestBuilder); |
| perform.andExpect(status().isOk()) |
| .andExpect(jsonPath("$.data.jstack").value("test")); |
| } |
| |
| @Override protected Object getTestController() { |
| return consumerController; |
| } |
| } |