Fix examine consume stats for none static topic (#4571)

diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index 81dc864..7fe49b1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -17,14 +17,18 @@
 
 package org.apache.rocketmq.test.smoke;
 
+import java.util.List;
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
 import org.apache.rocketmq.test.util.VerifyUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,13 +41,18 @@
     private RMQNormalConsumer consumer = null;
     private RMQNormalProducer producer = null;
     private String topic = null;
+    private String group = null;
+    private DefaultMQAdminExt defaultMQAdminExt;
 
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         topic = initTopic();
+        group = initConsumerGroup();
         logger.info(String.format("use topic: %s;", topic));
         producer = getProducer(nsAddr, topic);
-        consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
+        consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+        defaultMQAdminExt = getAdmin(nsAddr);
+        defaultMQAdminExt.start();
     }
 
     @After
@@ -52,17 +61,31 @@
     }
 
     @Test
-    public void testSynSendMessage() {
+    public void testSynSendMessage() throws Exception {
         int msgSize = 10;
-        producer.send(msgSize);
-        Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
+        List<MessageQueue> messageQueueList = producer.getProducer().fetchPublishMessageQueues(topic);
+        for (MessageQueue messageQueue: messageQueueList) {
+            producer.send(msgSize, messageQueue);
+        }
+        Assert.assertEquals("Not all sent succeeded", msgSize * messageQueueList.size(), producer.getAllUndupMsgBody().size());
         consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
             consumer.getListener().getAllMsgBody()))
             .containsExactlyElementsIn(producer.getAllMsgBody());
+
         for (Object o : consumer.getListener().getAllOriginMsg()) {
             MessageClientExt msg = (MessageClientExt) o;
             assertThat(msg.getProperty(MessageConst.PROPERTY_POP_CK)).isNull();
         }
+        //shutdown to persist the offset
+        consumer.getConsumer().shutdown();
+        ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(group);
+        //+1 for the retry topic
+        for (MessageQueue messageQueue: messageQueueList) {
+            Assert.assertTrue(consumeStats.getOffsetTable().containsKey(messageQueue));
+            Assert.assertEquals(msgSize, consumeStats.getOffsetTable().get(messageQueue).getConsumerOffset());
+            Assert.assertEquals(msgSize, consumeStats.getOffsetTable().get(messageQueue).getBrokerOffset());
+        }
+
     }
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index 41c9c7e..1ce69b4 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -395,6 +395,8 @@
     }
 
 
+
+
     @Test
     public void testRemappingAndClear() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 7b78ead..bd732ab 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -415,6 +415,15 @@
 
         for (String currentTopic : topics) {
             TopicRouteData currentRoute = this.examineTopicRouteInfo(currentTopic);
+            if (currentRoute.getTopicQueueMappingByBroker() == null
+                || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
+                //normal topic
+                for (Map.Entry<MessageQueue, OffsetWrapper> entry: result.getOffsetTable().entrySet()) {
+                    if (entry.getKey().getTopic().equals(currentTopic)) {
+                        staticResult.getOffsetTable().put(entry.getKey(), entry.getValue());
+                    }
+                }
+            }
             Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, defaultMQAdminExt);
             ConsumeStats consumeStats = MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
             staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index bc72038..9c1af64 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -19,6 +19,10 @@
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
 import ch.qos.logback.core.joran.spi.JoranException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
@@ -92,9 +96,6 @@
 import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class MQAdminStartup {
     protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
 
@@ -253,7 +254,12 @@
         JoranConfigurator configurator = new JoranConfigurator();
         configurator.setContext(lc);
         lc.reset();
-        configurator.doConfigure(rocketmqHome + "/conf/logback_tools.xml");
+
+        //avoid the exception
+        if (rocketmqHome != null
+            && Files.exists(Paths.get(rocketmqHome + "/conf/logback_tools.xml"))) {
+            configurator.doConfigure(rocketmqHome + "/conf/logback_tools.xml");
+        }
     }
 
     private static void printHelp() {