[ISSUE #99] ignored rocketmq_group_get_latency_by_storetime for boardcast consumer
[ISSUE #100] ignored rocketmq_group_get_latency_by_storetime for boardcast consumer
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index b29c839..7f4b940 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -378,37 +378,39 @@
}
// get consumer latency
- try {
- HashMap<String, Long> consumerLatencyMap = new HashMap<>();
- for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStats.getOffsetTable().entrySet()) {
- MessageQueue q = consumeStatusEntry.getKey();
- OffsetWrapper offset = consumeStatusEntry.getValue();
- PullResult consumePullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, offset.getConsumerOffset());
- long lagTime = 0;
- if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
- lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
- if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
- lagTime = 0;
+ if (MessageModel.CLUSTERING == messageModel) {
+ try {
+ HashMap<String, Long> consumerLatencyMap = new HashMap<>();
+ for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStats.getOffsetTable().entrySet()) {
+ MessageQueue q = consumeStatusEntry.getKey();
+ OffsetWrapper offset = consumeStatusEntry.getValue();
+ PullResult consumePullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, offset.getConsumerOffset());
+ long lagTime = 0;
+ if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
+ lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
+ if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
+ lagTime = 0;
+ }
+ } else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
+ PullResult pullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, consumePullResult.getMinOffset());
+ if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
+ lagTime = System.currentTimeMillis() - pullResult.getMsgFoundList().get(0).getStoreTimestamp();
+ }
}
- } else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
- PullResult pullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, consumePullResult.getMinOffset());
- if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
- lagTime = System.currentTimeMillis() - pullResult.getMsgFoundList().get(0).getStoreTimestamp();
+ if (!consumerLatencyMap.containsKey(q.getBrokerName())) {
+ consumerLatencyMap.put(q.getBrokerName(), lagTime > 0 ? lagTime : 0);
+ } else if (lagTime > consumerLatencyMap.get(q.getBrokerName())) {
+ consumerLatencyMap.put(q.getBrokerName(), lagTime);
}
}
- if (!consumerLatencyMap.containsKey(q.getBrokerName())) {
- consumerLatencyMap.put(q.getBrokerName(), lagTime > 0 ? lagTime : 0);
- } else if (lagTime > consumerLatencyMap.get(q.getBrokerName())) {
- consumerLatencyMap.put(q.getBrokerName(), lagTime);
+ for (Map.Entry<String, Long> consumeLatencyEntry : consumerLatencyMap.entrySet()) {
+ metricsService.getCollector().addGroupGetLatencyByStoreTimeMetric(clusterName,
+ consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
}
- }
- for (Map.Entry<String, Long> consumeLatencyEntry : consumerLatencyMap.entrySet()) {
- metricsService.getCollector().addGroupGetLatencyByStoreTimeMetric(clusterName,
- consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
- }
- } catch (Exception ex) {
- log.warn("addGroupGetLatencyByStoreTimeMetric error", ex);
+ } catch (Exception ex) {
+ log.warn("addGroupGetLatencyByStoreTimeMetric error", ex);
+ }
}
}
}