| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.rocketmq.console.service.impl; |
| |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.rocketmq.client.exception.MQClientException; |
| import org.apache.rocketmq.common.MQVersion; |
| import org.apache.rocketmq.common.admin.ConsumeStats; |
| import org.apache.rocketmq.common.admin.RollbackStats; |
| import org.apache.rocketmq.common.message.MessageQueue; |
| import org.apache.rocketmq.common.protocol.ResponseCode; |
| import org.apache.rocketmq.common.protocol.body.ClusterInfo; |
| import org.apache.rocketmq.common.protocol.body.Connection; |
| import org.apache.rocketmq.common.protocol.body.ConsumerConnection; |
| import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; |
| import org.apache.rocketmq.common.protocol.body.GroupList; |
| import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; |
| import org.apache.rocketmq.common.protocol.route.BrokerData; |
| import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; |
| import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod; |
| import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat; |
| import org.apache.rocketmq.console.model.GroupConsumeInfo; |
| import org.apache.rocketmq.console.model.QueueStatInfo; |
| import org.apache.rocketmq.console.model.TopicConsumerInfo; |
| import org.apache.rocketmq.console.model.request.ConsumerConfigInfo; |
| import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest; |
| import org.apache.rocketmq.console.model.request.ResetOffsetRequest; |
| import org.apache.rocketmq.console.service.AbstractCommonService; |
| import org.apache.rocketmq.console.service.ConsumerService; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.stereotype.Service; |
| |
| import static com.google.common.base.Throwables.propagate; |
| |
| @Service |
| public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService { |
| private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); |
| |
| @Override |
| @MultiMQAdminCmdMethod |
| public List<GroupConsumeInfo> queryGroupList() { |
| Set<String> consumerGroupSet = Sets.newHashSet(); |
| try { |
| ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); |
| for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { |
| SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); |
| consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); |
| } |
| } |
| catch (Exception err) { |
| throw Throwables.propagate(err); |
| } |
| List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList(); |
| for (String consumerGroup : consumerGroupSet) { |
| groupConsumeInfoList.add(queryGroup(consumerGroup)); |
| } |
| Collections.sort(groupConsumeInfoList); |
| return groupConsumeInfoList; |
| } |
| |
| @Override |
| @MultiMQAdminCmdMethod |
| public GroupConsumeInfo queryGroup(String consumerGroup) { |
| GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); |
| try { |
| ConsumeStats consumeStats = null; |
| try { |
| consumeStats = mqAdminExt.examineConsumeStats(consumerGroup); |
| } |
| catch (Exception e) { |
| logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); |
| } |
| |
| ConsumerConnection consumerConnection = null; |
| try { |
| consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); |
| } |
| catch (Exception e) { |
| logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); |
| } |
| |
| groupConsumeInfo.setGroup(consumerGroup); |
| |
| if (consumeStats != null) { |
| groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); |
| groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); |
| } |
| |
| if (consumerConnection != null) { |
| groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size()); |
| groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel()); |
| groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType()); |
| groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion())); |
| } |
| } |
| catch (Exception e) { |
| logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " |
| + consumerGroup, e); |
| } |
| return groupConsumeInfo; |
| } |
| |
| @Override |
| public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) { |
| return queryConsumeStatsList(null, groupName); |
| } |
| |
| @Override |
| @MultiMQAdminCmdMethod |
| public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) { |
| ConsumeStats consumeStats = null; |
| try { |
| consumeStats = mqAdminExt.examineConsumeStats(groupName, topic); |
| } |
| catch (Exception e) { |
| throw propagate(e); |
| } |
| List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { |
| @Override |
| public boolean apply(MessageQueue o) { |
| return StringUtils.isBlank(topic) || o.getTopic().equals(topic); |
| } |
| })); |
| Collections.sort(mqList); |
| List<TopicConsumerInfo> topicConsumerInfoList = Lists.newArrayList(); |
| TopicConsumerInfo nowTopicConsumerInfo = null; |
| Map<MessageQueue, String> messageQueueClientMap = getClientConnection(groupName); |
| for (MessageQueue mq : mqList) { |
| if (nowTopicConsumerInfo == null || (!StringUtils.equals(mq.getTopic(), nowTopicConsumerInfo.getTopic()))) { |
| nowTopicConsumerInfo = new TopicConsumerInfo(mq.getTopic()); |
| topicConsumerInfoList.add(nowTopicConsumerInfo); |
| } |
| QueueStatInfo queueStatInfo = QueueStatInfo.fromOffsetTableEntry(mq, consumeStats.getOffsetTable().get(mq)); |
| queueStatInfo.setClientInfo(messageQueueClientMap.get(mq)); |
| nowTopicConsumerInfo.appendQueueStatInfo(queueStatInfo); |
| } |
| return topicConsumerInfoList; |
| } |
| |
| private Map<MessageQueue, String> getClientConnection(String groupName) { |
| Map<MessageQueue, String> results = Maps.newHashMap(); |
| try { |
| ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(groupName); |
| for (Connection connection : consumerConnection.getConnectionSet()) { |
| String clinetId = connection.getClientId(); |
| ConsumerRunningInfo consumerRunningInfo = mqAdminExt.getConsumerRunningInfo(groupName, clinetId, false); |
| for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) { |
| // results.put(messageQueue, clinetId + " " + connection.getClientAddr()); |
| results.put(messageQueue, clinetId); |
| } |
| } |
| } |
| catch (Exception err) { |
| logger.error("op=getClientConnection_error", err); |
| } |
| return results; |
| } |
| |
| @Override |
| @MultiMQAdminCmdMethod |
| public Map<String /*groupName*/, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic) { |
| Map<String, TopicConsumerInfo> group2ConsumerInfoMap = Maps.newHashMap(); |
| try { |
| GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic); |
| for (String group : groupList.getGroupList()) { |
| List<TopicConsumerInfo> topicConsumerInfoList = null; |
| try { |
| topicConsumerInfoList = queryConsumeStatsList(topic, group); |
| } |
| catch (Exception ignore) { |
| } |
| group2ConsumerInfoMap.put(group, CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : topicConsumerInfoList.get(0)); |
| } |
| return group2ConsumerInfoMap; |
| } |
| catch (Exception e) { |
| throw propagate(e); |
| } |
| } |
| |
| @Override |
| @MultiMQAdminCmdMethod |
| public Map<String, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest) { |
| Map<String, ConsumerGroupRollBackStat> groupRollbackStats = Maps.newHashMap(); |
| for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) { |
| try { |
| Map<MessageQueue, Long> rollbackStatsMap = |
| mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce()); |
| ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true); |
| List<RollbackStats> rollbackStatsList = consumerGroupRollBackStat.getRollbackStatsList(); |
| for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : rollbackStatsMap.entrySet()) { |
| RollbackStats rollbackStats = new RollbackStats(); |
| rollbackStats.setRollbackOffset(rollbackStatsEntty.getValue()); |
| rollbackStats.setQueueId(rollbackStatsEntty.getKey().getQueueId()); |
| rollbackStats.setBrokerName(rollbackStatsEntty.getKey().getBrokerName()); |
| rollbackStatsList.add(rollbackStats); |
| } |
| groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat); |
| } |
| catch (MQClientException e) { |
| if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { |
| try { |
| ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true); |
| List<RollbackStats> rollbackStatsList = mqAdminExt.resetOffsetByTimestampOld(consumerGroup, resetOffsetRequest.getTopic(), resetOffsetRequest.getResetTime(), true); |
| consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList); |
| groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat); |
| continue; |
| } |
| catch (Exception err) { |
| logger.error("op=resetOffset_which_not_online_error", err); |
| } |
| } |
| else { |
| logger.error("op=resetOffset_error", e); |
| } |
| groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage())); |
| } |
| catch (Exception e) { |
| logger.error("op=resetOffset_error", e); |
| groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage())); |
| } |
| } |
| return groupRollbackStats; |
| } |
| |
| @Override |
| @MultiMQAdminCmdMethod |
| public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) { |
| List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList(); |
| try { |
| ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); |
| for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName |
| String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(); |
| SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group); |
| if (subscriptionGroupConfig == null) { |
| continue; |
| } |
| consumerConfigInfoList.add(new ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig)); |
| } |
| } |
| catch (Exception e) { |
| throw propagate(e); |
| } |
| return consumerConfigInfoList; |
| } |
| |
| @Override |
| @MultiMQAdminCmdMethod |
| public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) { |
| try { |
| ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); |
| for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) { |
| logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); |
| mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName()); |
| } |
| } |
| catch (Exception e) { |
| throw propagate(e); |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) { |
| try { |
| ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); |
| for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(), |
| consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList())) { |
| mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), consumerConfigInfo.getSubscriptionGroupConfig()); |
| } |
| } |
| catch (Exception err) { |
| throw Throwables.propagate(err); |
| } |
| return true; |
| } |
| |
| @Override |
| @MultiMQAdminCmdMethod |
| public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) { |
| Set<String> brokerNameSet = Sets.newHashSet(); |
| try { |
| List<ConsumerConfigInfo> consumerConfigInfoList = examineSubscriptionGroupConfig(group); |
| for (ConsumerConfigInfo consumerConfigInfo : consumerConfigInfoList) { |
| brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList()); |
| } |
| } |
| catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| return brokerNameSet; |
| |
| } |
| |
| @Override |
| public ConsumerConnection getConsumerConnection(String consumerGroup) { |
| try { |
| return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); |
| } |
| catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) { |
| try { |
| return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack); |
| } |
| catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| } |