[ISSUE #123]Optimize groupList.query (#124)
Co-authored-by: zhangjidi <zhangjidi@cmss.chinamobile.com>
diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index 3ad85d4..b1011b7 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -29,6 +29,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
@@ -48,6 +56,7 @@
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
@@ -60,12 +69,14 @@
import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import static com.google.common.base.Throwables.propagate;
@Service
-public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
+public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
@Resource
@@ -73,6 +84,31 @@
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
+ private ExecutorService executorService;
+
+ @Override
+ public void afterPropertiesSet() {
+ Runtime runtime = Runtime.getRuntime();
+ int corePoolSize = Math.max(10, runtime.availableProcessors() * 2);
+ int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2);
+ ThreadFactory threadFactory = new ThreadFactory() {
+ private final AtomicLong threadIndex = new AtomicLong(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet());
+ }
+ };
+ RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
+ this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(5000), threadFactory, handler);
+ }
+
+ @Override
+ public void destroy() {
+ ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS);
+ }
+
static {
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
@@ -97,10 +133,26 @@
catch (Exception err) {
throw Throwables.propagate(err);
}
- List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
+ List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
+ CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
for (String consumerGroup : consumerGroupSet) {
- groupConsumeInfoList.add(queryGroup(consumerGroup));
+ executorService.submit(() -> {
+ try {
+ GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
+ groupConsumeInfoList.add(consumeInfo);
+ } catch (Exception e) {
+ logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
}
+ try {
+ countDownLatch.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.error("query consumerGroup countDownLatch await Exception", e);
+ }
+
if (!skipSysGroup) {
groupConsumeInfoList.stream().map(group -> {
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
diff --git a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
index dfc3c22..b95e80a 100644
--- a/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
+++ b/src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java
@@ -67,6 +67,7 @@
@Before
public void init() throws Exception {
+ consumerService.afterPropertiesSet();
super.mockRmqConfigure();
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
@@ -93,9 +94,10 @@
perform = mockMvc.perform(requestBuilder);
perform.andExpect(status().isOk())
.andExpect(jsonPath("$.data", hasSize(2)))
- .andExpect(jsonPath("$.data[0].group").value("group_test"))
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
+ // executorService shutdown
+ consumerService.destroy();
}
@Test