Proxy Support And ConsumerGroup Enhancement (#207)
* Support dashboard v4-v5 switch And query for v5 topic
* Modify tag name
* Support proxy-module And Fix the problem of showing wrong consumerGroup-info
---------
Co-authored-by: yuanziwei <yuanziwei@xiaomi.com>
diff --git a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
index 991a2d8..5ce21ff 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
@@ -43,6 +43,8 @@
//use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env
private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ private volatile String proxyAddr;
+
private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
@@ -62,6 +64,8 @@
private List<String> namesrvAddrs = new ArrayList<>();
+ private List<String> proxyAddrs = new ArrayList<>();
+
public String getAccessKey() {
return accessKey;
}
@@ -86,6 +90,25 @@
return namesrvAddrs;
}
+ public List<String> getProxyAddrs() {
+ return this.proxyAddrs;
+ }
+
+ public void setProxyAddrs(List<String> proxyAddrs) {
+ this.proxyAddrs = proxyAddrs;
+ if (CollectionUtils.isNotEmpty(proxyAddrs)) {
+ this.setProxyAddr(proxyAddrs.get(0));
+ }
+ }
+
+ public String getProxyAddr() {
+ return proxyAddr;
+ }
+
+ public void setProxyAddr(String proxyAddr) {
+ this.proxyAddr = proxyAddr;
+ }
+
public void setNamesrvAddrs(List<String> namesrvAddrs) {
this.namesrvAddrs = namesrvAddrs;
if (CollectionUtils.isNotEmpty(namesrvAddrs)) {
diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
index d9f22e4..96fc056 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
@@ -47,14 +47,14 @@
@RequestMapping(value = "/groupList.query")
@ResponseBody
- public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup) {
- return consumerService.queryGroupList(skipSysGroup);
+ public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup, String address) {
+ return consumerService.queryGroupList(skipSysGroup, address);
}
@RequestMapping(value = "/group.query")
@ResponseBody
- public Object groupQuery(@RequestParam String consumerGroup) {
- return consumerService.queryGroup(consumerGroup);
+ public Object groupQuery(@RequestParam String consumerGroup, String address) {
+ return consumerService.queryGroup(consumerGroup, address);
}
@RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST})
@@ -99,14 +99,14 @@
@RequestMapping(value = "/queryTopicByConsumer.query")
@ResponseBody
- public Object queryConsumerByTopic(@RequestParam String consumerGroup) {
- return consumerService.queryConsumeStatsListByGroupName(consumerGroup);
+ public Object queryConsumerByTopic(@RequestParam String consumerGroup, String address) {
+ return consumerService.queryConsumeStatsListByGroupName(consumerGroup, address);
}
@RequestMapping(value = "/consumerConnection.query")
@ResponseBody
- public Object consumerConnection(@RequestParam(required = false) String consumerGroup) {
- ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup);
+ public Object consumerConnection(@RequestParam(required = false) String consumerGroup, String address) {
+ ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup, address);
consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet()));
return consumerConnection;
}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java
new file mode 100644
index 0000000..27aa59d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.controller;
+
+import org.apache.rocketmq.dashboard.permisssion.Permission;
+import org.apache.rocketmq.dashboard.service.ProxyService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import javax.annotation.Resource;
+
+@Controller
+@RequestMapping("/proxy")
+@Permission
+public class ProxyController {
+ @Resource
+ private ProxyService proxyService;
+ @RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
+ @ResponseBody
+ public Object homePage() {
+ return proxyService.getProxyHomePage();
+ }
+
+ @RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST)
+ @ResponseBody
+ public Object addProxyAddr(@RequestParam String newProxyAddr) {
+ proxyService.addProxyAddrList(newProxyAddr);
+ return true;
+ }
+
+ @RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST)
+ @ResponseBody
+ public Object updateProxyAddr(@RequestParam String proxyAddr) {
+ proxyService.updateProxyAddrList(proxyAddr);
+ return true;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
index 0d19af9..db11c41 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
@@ -19,12 +19,15 @@
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import java.util.List;
+
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private String group;
private String version;
private int count;
private ConsumeType consumeType;
private MessageModel messageModel;
+ private List<String> address;
private int consumeTps;
private long diffTotal = -1;
private String subGroupType = "NORMAL";
@@ -70,6 +73,22 @@
this.diffTotal = diffTotal;
}
+ public List<String> getAddress() {
+ return address;
+ }
+
+ public void setAddress(List<String> address) {
+ this.address = address;
+ }
+
+ public String getSubGroupType() {
+ return subGroupType;
+ }
+
+ public void setSubGroupType(String subGroupType) {
+ this.subGroupType = subGroupType;
+ }
+
@Override
public int compareTo(GroupConsumeInfo o) {
if (this.count != o.count) {
@@ -93,12 +112,4 @@
public void setVersion(String version) {
this.version = version;
}
-
- public String getSubGroupType() {
- return subGroupType;
- }
-
- public void setSubGroupType(String subGroupType) {
- this.subGroupType = subGroupType;
- }
}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
index c475931..e284c44 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
@@ -31,12 +31,12 @@
import java.util.Set;
public interface ConsumerService {
- List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup);
+ List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup,String address);
- GroupConsumeInfo queryGroup(String consumerGroup);
+ GroupConsumeInfo queryGroup(String consumerGroup, String address);
- List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName);
+ List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address);
List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName);
@@ -52,7 +52,7 @@
Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
- ConsumerConnection getConsumerConnection(String consumerGroup);
+ ConsumerConnection getConsumerConnection(String consumerGroup, String address);
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java
new file mode 100644
index 0000000..2a64680
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service;
+
+import java.util.Map;
+
+public interface ProxyService {
+
+ void addProxyAddrList(String proxyAddr);
+
+ void updateProxyAddrList(String proxyAddr);
+
+ Map<String, Object> getProxyHomePage();
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
index 360c0e3..0281c5c 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
@@ -627,7 +627,7 @@
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
// TODO Auto-generated method stub
- throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStats'");
+ return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, consumerGroup, topicName, timeoutMillis);
}
@Override
@@ -639,8 +639,7 @@
@Override
public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr)
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Unimplemented method 'examineConsumerConnectionInfo'");
+ return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup, brokerAddr);
}
@Override
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java
new file mode 100644
index 0000000..4344c7c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.client;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+
+public interface ProxyAdmin {
+
+ ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException;
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java
new file mode 100644
index 0000000..eadae12
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.client;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST;
+
+@Slf4j
+@Service
+public class ProxyAdminImpl implements ProxyAdmin {
+ @Autowired
+ private GenericObjectPool<MQAdminExt> mqAdminExtPool;
+
+ @Override
+ public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+ try {
+ MQAdminInstance.createMQAdmin(mqAdminExtPool);
+ RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
+ GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ RemotingCommand request = RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, requestHeader);
+ RemotingCommand response = remotingClient.invokeSync(addr, request, 3000);
+ switch (response.getCode()) {
+ case 0:
+ return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
+ default:
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
+ }
+ } finally {
+ MQAdminInstance.returnMQAdmin(mqAdminExtPool);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index a1cf9ff..9bc37ab 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -23,8 +23,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -44,6 +46,7 @@
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -78,6 +81,8 @@
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Resource
+ protected ProxyAdmin proxyAdmin;
+ @Resource
private RMQConfigure configure;
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
@@ -119,25 +124,33 @@
}
@Override
- public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup) {
- Set<String> consumerGroupSet = Sets.newHashSet();
+ public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String address) {
+ HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
- consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+ for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
+ if (!consumerGroupMap.containsKey(groupName)) {
+ consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());
+ }
+ List<String> addresses = consumerGroupMap.get(groupName);
+ addresses.add(brokerData.selectBrokerAddr());
+ consumerGroupMap.put(groupName, addresses);
+ }
}
- }
- catch (Exception err) {
+ } catch (Exception err) {
Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
}
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
- CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
- for (String consumerGroup : consumerGroupSet) {
+ CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size());
+ for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) {
+ String consumerGroup = entry.getKey();
executorService.submit(() -> {
try {
- GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
+ GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address);
+ consumeInfo.setAddress(entry.getValue());
groupConsumeInfoList.add(consumeInfo);
} catch (Exception e) {
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
@@ -165,7 +178,7 @@
}
@Override
- public GroupConsumeInfo queryGroup(String consumerGroup) {
+ public GroupConsumeInfo queryGroup(String consumerGroup, String address) {
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
try {
ConsumeStats consumeStats = null;
@@ -182,9 +195,12 @@
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
try {
- consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
- }
- catch (Exception e) {
+ if (StringUtils.isNotEmpty(address)) {
+ consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
+ } else {
+ consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+ }
+ } catch (Exception e) {
logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
}
@@ -217,8 +233,18 @@
}
@Override
- public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) {
- return queryConsumeStatsList(null, groupName);
+ public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) {
+ ConsumeStats consumeStats;
+ String topic = null;
+ try {
+ String[] addresses = address.split(",");
+ String addr = addresses[0];
+ consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ return toTopicConsumerInfoList(topic, consumeStats, groupName);
}
@Override
@@ -231,6 +257,10 @@
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
+ return toTopicConsumerInfoList(topic, consumeStats, groupName);
+ }
+
+ private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) {
List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
@Override
public boolean apply(MessageQueue o) {
@@ -431,11 +461,12 @@
}
@Override
- public ConsumerConnection getConsumerConnection(String consumerGroup) {
+ public ConsumerConnection getConsumerConnection(String consumerGroup, String address) {
try {
- return mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
- }
- catch (Exception e) {
+ String[] addresses = address.split(",");
+ String addr = addresses[0];
+ return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr);
+ } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java
new file mode 100644
index 0000000..07e63b3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.impl;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.dashboard.config.RMQConfigure;
+import org.apache.rocketmq.dashboard.service.ProxyService;
+import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@Service
+public class ProxyServiceImpl implements ProxyService {
+ @Resource
+ protected ProxyAdmin proxyAdmin;
+ @Resource
+ private RMQConfigure configure;
+
+ @Override
+ public void addProxyAddrList(String proxyAddr) {
+ List<String> proxyAddrs = configure.getProxyAddrs();
+ if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) {
+ proxyAddrs.add(proxyAddr);
+ }
+ configure.setProxyAddrs(proxyAddrs);
+ }
+
+ @Override
+ public void updateProxyAddrList(String proxyAddr) {
+ configure.setProxyAddr(proxyAddr);
+ }
+
+ @Override
+ public Map<String, Object> getProxyHomePage() {
+ Map<String, Object> homePageInfoMap = Maps.newHashMap();
+ homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr());
+ homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs());
+ return homePageInfoMap;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
index 710929b..3c8a77e 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
@@ -40,7 +40,7 @@
// @Scheduled(cron = "* * * * * ?")
public void scanProblemConsumeGroup() {
for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) {
- GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey());
+ GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey(), null);
if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 090e421..fe4d283 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -59,6 +59,9 @@
# must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
loginRequired: false
useTLS: false
+ proxyAddr: 127.0.0.1:8080
+ proxyAddrs:
+ - 127.0.0.1:8080
# set the accessKey and secretKey if you used acl
# accessKey: rocketmq2
# secretKey: 12345678
diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html
index c2bf349..ee3c3fe 100644
--- a/src/main/resources/static/index.html
+++ b/src/main/resources/static/index.html
@@ -104,6 +104,7 @@
<script type="text/javascript" src="src/tools/tools.js?v=201703171710"></script>
<script type="text/javascript" src="src/cluster.js?timestamp=4"></script>
<script type="text/javascript" src="src/topic.js"></script>
+<script type="text/javascript" src="src/proxy.js"></script>
<script type="text/javascript" src="src/consumer.js?timestamp=6"></script>
<script type="text/javascript" src="src/producer.js"></script>
<script type="text/javascript" src="src/message.js"></script>
diff --git a/src/main/resources/static/src/app.js b/src/main/resources/static/src/app.js
index a7ca1be..1bbb650 100644
--- a/src/main/resources/static/src/app.js
+++ b/src/main/resources/static/src/app.js
@@ -213,6 +213,9 @@
}).when('/ops', {
templateUrl: 'view/pages/ops.html',
controller:'opsController'
+ }).when('/proxy', {
+ templateUrl: 'view/pages/proxy.html',
+ controller:'proxyController'
}).when('/acl', {
templateUrl: 'view/pages/acl.html',
controller: 'aclController'
diff --git a/src/main/resources/static/src/consumer.js b/src/main/resources/static/src/consumer.js
index 8c0833e..d192334 100644
--- a/src/main/resources/static/src/consumer.js
+++ b/src/main/resources/static/src/consumer.js
@@ -79,6 +79,7 @@
url: "consumer/groupList.query",
params: {
skipSysGroup: false,
+ address: localStorage.getItem('isV5') ? localStorage.getItem('proxyAddr') : null
}
}).success(function (resp) {
if (resp.status == 0) {
@@ -243,11 +244,11 @@
}
});
};
- $scope.detail = function (consumerGroupName) {
+ $scope.detail = function (consumerGroupName, address) {
$http({
method: "GET",
url: "consumer/queryTopicByConsumer.query",
- params: {consumerGroup: consumerGroupName}
+ params: {consumerGroup: consumerGroupName, address: address}
}).success(function (resp) {
if (resp.status == 0) {
console.log(resp);
@@ -262,11 +263,11 @@
});
};
- $scope.client = function (consumerGroupName) {
+ $scope.client = function (consumerGroupName, address) {
$http({
method: "GET",
url: "consumer/consumerConnection.query",
- params: {consumerGroup: consumerGroupName}
+ params: {consumerGroup: consumerGroupName, address: address}
}).success(function (resp) {
if (resp.status == 0) {
console.log(resp);
diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js
index 6bc16cd..83083d7 100644
--- a/src/main/resources/static/src/i18n/en.js
+++ b/src/main/resources/static/src/i18n/en.js
@@ -100,6 +100,7 @@
"RESET_OFFSET":"resetOffset",
"CLUSTER_NAME":"clusterName",
"OPS":"OPS",
+ "PROXY":"Proxy",
"AUTO_REFRESH":"AUTO_REFRESH",
"REFRESH":"REFRESH",
"LOGOUT":"Logout",
diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js
index f71ae34..f8c3c1d 100644
--- a/src/main/resources/static/src/i18n/zh.js
+++ b/src/main/resources/static/src/i18n/zh.js
@@ -101,6 +101,7 @@
"RESET_OFFSET":"重置位点",
"CLUSTER_NAME":"集群名",
"OPS":"运维",
+ "PROXY":"代理",
"AUTO_REFRESH":"自动刷新",
"REFRESH":"刷新",
"LOGOUT":"退出",
diff --git a/src/main/resources/static/src/proxy.js b/src/main/resources/static/src/proxy.js
new file mode 100644
index 0000000..4461b09
--- /dev/null
+++ b/src/main/resources/static/src/proxy.js
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+var module = app;
+module.controller('proxyController', ['$scope', '$location', '$http', 'Notification', 'remoteApi', 'tools', '$window',
+ function ($scope, $location, $http, Notification, remoteApi, tools, $window) {
+ $scope.proxyAddrList = [];
+ $scope.userRole = $window.sessionStorage.getItem("userrole");
+ $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false);
+ $scope.inputReadonly = !$scope.writeOperationEnabled;
+ $scope.newProxyAddr = "";
+ $scope.allProxyConfig = {};
+
+ $http({
+ method: "GET",
+ url: "proxy/homePage.query"
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ $scope.proxyAddrList = resp.data.proxyAddrList;
+ $scope.selectedProxy = resp.data.currentProxyAddr;
+ $scope.showProxyDetailConfig($scope.selectedProxy);
+ localStorage.setItem('proxyAddr',$scope.selectedProxy);
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+
+ $scope.eleChange = function (data) {
+ $scope.proxyAddrList = data;
+ }
+ $scope.showDetailConf = function () {
+ $(".proxyModal").modal();
+ }
+
+
+ $scope.showProxyDetailConfig = function (proxyAddr) {
+ $http({
+ method: "GET",
+ url: "proxy/proxyDetailConfig.query",
+ params: {proxyAddress: proxyAddr}
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ $scope.allProxyConfig = resp.data;
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+ };
+
+ $scope.updateProxyAddr = function () {
+ $http({
+ method: "POST",
+ url: "proxy/updateProxyAddr.do",
+ params: {proxyAddr: $scope.selectedProxy}
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ localStorage.setItem('proxyAddr', $scope.selectedProxy);
+ Notification.info({message: "SUCCESS", delay: 2000});
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+ $scope.showProxyDetailConfig($scope.selectedProxy);
+ };
+
+ $scope.addProxyAddr = function () {
+ $http({
+ method: "POST",
+ url: "proxy/addProxyAddr.do",
+ params: {newProxyAddr: $scope.newProxyAddr}
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) == -1) {
+ $scope.proxyAddrList.push($scope.newProxyAddr);
+ }
+ $("#proxyAddr").val("");
+ $scope.newProxyAddr = "";
+ Notification.info({message: "SUCCESS", delay: 2000});
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+ };
+ }])
diff --git a/src/main/resources/static/view/layout/_header.html b/src/main/resources/static/view/layout/_header.html
index a78b9f2..8159138 100644
--- a/src/main/resources/static/view/layout/_header.html
+++ b/src/main/resources/static/view/layout/_header.html
@@ -28,6 +28,7 @@
<div class="navbar-collapse collapse navbar-warning-collapse">
<ul class="nav navbar-nav">
<li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li>
+ <li ng-show="rmqVersion" ng-class="path =='proxy' ? 'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li>
<li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li>
<li ng-class="path =='cluster' ? 'active':''"><a ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li>
<li ng-class="path =='topic' ? 'active':''"><a ng-href="#/topic">{{'TOPIC' | translate}}</a></li>
diff --git a/src/main/resources/static/view/pages/consumer.html b/src/main/resources/static/view/pages/consumer.html
index 47fddad..d883afc 100644
--- a/src/main/resources/static/view/pages/consumer.html
+++ b/src/main/resources/static/view/pages/consumer.html
@@ -66,11 +66,11 @@
<td class="text-center">{{consumerGroup.consumeTps}}</td>
<td class="text-center">{{consumerGroup.diffTotal}}</td>
<td class="text-left">
- <button name="client" ng-click="client(consumerGroup.group)"
+ <button name="client" ng-click="client(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary"
type="button">{{'CLIENT' | translate}}
</button>
- <button name="client" ng-click="detail(consumerGroup.group)"
+ <button name="client" ng-click="detail(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary"
type="button">{{'CONSUME_DETAIL' | translate}}
</button>
diff --git a/src/main/resources/static/view/pages/proxy.html b/src/main/resources/static/view/pages/proxy.html
new file mode 100644
index 0000000..43f34ce
--- /dev/null
+++ b/src/main/resources/static/view/pages/proxy.html
@@ -0,0 +1,67 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<div class="container-fluid" id="deployHistoryList">
+ <div class="page-content">
+ <h2 class="md-title">ProxyServerAddressList</h2>
+ <div class="pull-left" style="min-width: 400px; max-width: 500px; padding: 10px 10px 10px 0">
+ <select ng-model="selectedProxy" chosen
+ ng-options="x for x in proxyAddrList"
+ ng-change="updateProxyAddr()"
+ required></select>
+ </div>
+ <div class="pull-left">
+ <button class="btn btn-raised btn-sm btn-primary" type="button" ng-show="{{writeOperationEnabled}}"
+ ng-click="updateProxyAddr()">{{'UPDATE' | translate}}
+ </button>
+ </div>
+ <form class="form-inline pull-left" style="margin-left: 20px" ng-show="{{writeOperationEnabled}}">
+ <div class="form-group" style="margin: 0">
+ <label for="proxyAddr">ProxyAddr:</label>
+ <input id="proxyAddr" class="form-control" style="width: 300px; margin: 0 10px 0 10px" type="text" ng-model="newProxyAddr" required/>
+ <button class="btn btn-raised btn-sm btn-primary" type="button"
+ ng-click="addProxyAddr()"> {{ 'ADD' | translate}}
+ </button>
+ </div>
+ </form>
+ </div>
+</div>
+
+<div class="modal proxyModal fade" role="dialog" tabindex="-1" aria-hidden="true" aria-labelledby="config-modal-label">
+ <div class="modal-dialog modal-lg">
+ <div class="modal-content" >
+ <div class="modal-header">
+ <button class="close" type="button" data-dismiss="modal">×</button>
+ <h4 id="config-modal-label" class="modal-title">
+ [{{selectedProxy}}]
+ </h4>
+ </div>
+ <div class="modal-body limit_height">
+ <table class="table table-bordered">
+ <tr ng-repeat="(key, value) in allProxyConfig">
+ <td>{{key}}</td>
+ <td>{{value}}</td>
+ </tr>
+ </table>
+ </div>
+ <div class="modal-footer">
+ <div class="col-md-12 text-center">
+ <button type="button" class="btn btn-raised" data-dismiss="modal">{{ 'CLOSE' | translate }}</button>
+ </div>
+ </div>
+ </div>
+ </div>
+</div>