Fix examine consume stats for none static topic (#4571)
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index 81dc864..7fe49b1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -17,14 +17,18 @@
package org.apache.rocketmq.test.smoke;
+import java.util.List;
import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.VerifyUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -37,13 +41,18 @@
private RMQNormalConsumer consumer = null;
private RMQNormalProducer producer = null;
private String topic = null;
+ private String group = null;
+ private DefaultMQAdminExt defaultMQAdminExt;
@Before
- public void setUp() {
+ public void setUp() throws Exception {
topic = initTopic();
+ group = initConsumerGroup();
logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic);
- consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
+ consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+ defaultMQAdminExt = getAdmin(nsAddr);
+ defaultMQAdminExt.start();
}
@After
@@ -52,17 +61,31 @@
}
@Test
- public void testSynSendMessage() {
+ public void testSynSendMessage() throws Exception {
int msgSize = 10;
- producer.send(msgSize);
- Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
+ List<MessageQueue> messageQueueList = producer.getProducer().fetchPublishMessageQueues(topic);
+ for (MessageQueue messageQueue: messageQueueList) {
+ producer.send(msgSize, messageQueue);
+ }
+ Assert.assertEquals("Not all sent succeeded", msgSize * messageQueueList.size(), producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
+
for (Object o : consumer.getListener().getAllOriginMsg()) {
MessageClientExt msg = (MessageClientExt) o;
assertThat(msg.getProperty(MessageConst.PROPERTY_POP_CK)).isNull();
}
+ //shutdown to persist the offset
+ consumer.getConsumer().shutdown();
+ ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(group);
+ //+1 for the retry topic
+ for (MessageQueue messageQueue: messageQueueList) {
+ Assert.assertTrue(consumeStats.getOffsetTable().containsKey(messageQueue));
+ Assert.assertEquals(msgSize, consumeStats.getOffsetTable().get(messageQueue).getConsumerOffset());
+ Assert.assertEquals(msgSize, consumeStats.getOffsetTable().get(messageQueue).getBrokerOffset());
+ }
+
}
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index 41c9c7e..1ce69b4 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -395,6 +395,8 @@
}
+
+
@Test
public void testRemappingAndClear() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 7b78ead..bd732ab 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -415,6 +415,15 @@
for (String currentTopic : topics) {
TopicRouteData currentRoute = this.examineTopicRouteInfo(currentTopic);
+ if (currentRoute.getTopicQueueMappingByBroker() == null
+ || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
+ //normal topic
+ for (Map.Entry<MessageQueue, OffsetWrapper> entry: result.getOffsetTable().entrySet()) {
+ if (entry.getKey().getTopic().equals(currentTopic)) {
+ staticResult.getOffsetTable().put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, defaultMQAdminExt);
ConsumeStats consumeStats = MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index bc72038..9c1af64 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -19,6 +19,10 @@
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
@@ -92,9 +96,6 @@
import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-
public class MQAdminStartup {
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
@@ -253,7 +254,12 @@
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
- configurator.doConfigure(rocketmqHome + "/conf/logback_tools.xml");
+
+ //avoid the exception
+ if (rocketmqHome != null
+ && Files.exists(Paths.get(rocketmqHome + "/conf/logback_tools.xml"))) {
+ configurator.doConfigure(rocketmqHome + "/conf/logback_tools.xml");
+ }
}
private static void printHelp() {