[ISSUE #99] ignored rocketmq_group_get_latency_by_storetime for boardcast consumer

[ISSUE #100] ignored rocketmq_group_get_latency_by_storetime for boardcast consumer
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index b29c839..7f4b940 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -378,37 +378,39 @@
                 }
 
                 // get consumer latency
-                try {
-                    HashMap<String, Long> consumerLatencyMap = new HashMap<>();
-                    for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStats.getOffsetTable().entrySet()) {
-                        MessageQueue q = consumeStatusEntry.getKey();
-                        OffsetWrapper offset = consumeStatusEntry.getValue();
-                        PullResult consumePullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, offset.getConsumerOffset());
-                        long lagTime = 0;
-                        if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
-                            lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
-                            if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
-                                lagTime = 0;
+                if (MessageModel.CLUSTERING == messageModel) {
+                    try {
+                        HashMap<String, Long> consumerLatencyMap = new HashMap<>();
+                        for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStats.getOffsetTable().entrySet()) {
+                            MessageQueue q = consumeStatusEntry.getKey();
+                            OffsetWrapper offset = consumeStatusEntry.getValue();
+                            PullResult consumePullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, offset.getConsumerOffset());
+                            long lagTime = 0;
+                            if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
+                                lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
+                                if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
+                                    lagTime = 0;
+                                }
+                            } else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
+                                PullResult pullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, consumePullResult.getMinOffset());
+                                if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
+                                    lagTime = System.currentTimeMillis() - pullResult.getMsgFoundList().get(0).getStoreTimestamp();
+                                }
                             }
-                        } else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
-                            PullResult pullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, consumePullResult.getMinOffset());
-                            if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
-                                lagTime = System.currentTimeMillis() - pullResult.getMsgFoundList().get(0).getStoreTimestamp();
+                            if (!consumerLatencyMap.containsKey(q.getBrokerName())) {
+                                consumerLatencyMap.put(q.getBrokerName(), lagTime > 0 ? lagTime : 0);
+                            } else if (lagTime > consumerLatencyMap.get(q.getBrokerName())) {
+                                consumerLatencyMap.put(q.getBrokerName(), lagTime);
                             }
                         }
-                        if (!consumerLatencyMap.containsKey(q.getBrokerName())) {
-                            consumerLatencyMap.put(q.getBrokerName(), lagTime > 0 ? lagTime : 0);
-                        } else if (lagTime > consumerLatencyMap.get(q.getBrokerName())) {
-                            consumerLatencyMap.put(q.getBrokerName(), lagTime);
+                        for (Map.Entry<String, Long> consumeLatencyEntry : consumerLatencyMap.entrySet()) {
+                            metricsService.getCollector().addGroupGetLatencyByStoreTimeMetric(clusterName,
+                                    consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
                         }
-                    }
-                    for (Map.Entry<String, Long> consumeLatencyEntry : consumerLatencyMap.entrySet()) {
-                        metricsService.getCollector().addGroupGetLatencyByStoreTimeMetric(clusterName,
-                            consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
-                    }
 
-                } catch (Exception ex) {
-                    log.warn("addGroupGetLatencyByStoreTimeMetric error", ex);
+                    } catch (Exception ex) {
+                        log.warn("addGroupGetLatencyByStoreTimeMetric error", ex);
+                    }
                 }
             }
         }