amend some metrics
diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index c797b63..8bbdc53 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -218,18 +218,18 @@
private static final List<String> TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
- "cluster", "brokerNames", "topic", "lastUpdateTimestamp"
+ "cluster", "brokerName", "topic", "lastUpdateTimestamp"
);
private static final List<String> DLQ_TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
- "cluster", "brokerNames", "group", "lastUpdateTimestamp"
+ "cluster", "brokerName", "group", "lastUpdateTimestamp"
);
private void loadTopicOffsetMetric(GaugeMetricFamily family, Map.Entry<ProducerMetric, Double> entry) {
family.addMetric(
Arrays.asList(
entry.getKey().getClusterName(),
- entry.getKey().getBrokerNames(),
+ entry.getKey().getBrokerName(),
entry.getKey().getTopicName(),
String.valueOf(entry.getKey().getLastUpdateTimestamp())
),
@@ -237,7 +237,7 @@
}
private void collectTopicOffsetMetric(List<MetricFamilySamples> mfs) {
- GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_topic_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
+ GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_producer_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
for (Map.Entry<ProducerMetric, Double> entry : topicOffset.entrySet()) {
loadTopicOffsetMetric(topicOffsetF, entry);
}
@@ -254,7 +254,7 @@
topicDLQOffsetF.addMetric(
Arrays.asList(
entry.getKey().getClusterName(),
- entry.getKey().getBrokerNames(),
+ entry.getKey().getBrokerName(),
entry.getKey().getGroup(),
String.valueOf(entry.getKey().getLastUpdateTimestamp())
),
@@ -351,19 +351,19 @@
family.addMetric(Arrays.asList(
entry.getKey().getClusterName(),
entry.getKey().getBrokerIP(),
- entry.getKey().getBrokerHost()),
+ entry.getKey().getBrokerName()),
entry.getValue()
);
}
private void collectBrokerNums(List<MetricFamilySamples> mfs) {
- GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_put_nums", "BrokerPutNums", BROKER_NUMS_LABEL_NAMES);
+ GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_tps", "BrokerPutNums", BROKER_NUMS_LABEL_NAMES);
for (Map.Entry<BrokerMetric, Double> entry : brokerPutNums.entrySet()) {
loadBrokerNums(brokerPutNumsGauge, entry);
}
mfs.add(brokerPutNumsGauge);
- GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_get_nums", "BrokerGetNums", BROKER_NUMS_LABEL_NAMES);
+ GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_qps", "BrokerGetNums", BROKER_NUMS_LABEL_NAMES);
for (Map.Entry<BrokerMetric, Double> entry : brokerGetNums.entrySet()) {
loadBrokerNums(brokerGetNumsGauge, entry);
}
@@ -502,39 +502,39 @@
}
private void collectTopicNums(List<MetricFamilySamples> mfs) {
- GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_topic_put_nums", "TopicPutNums", TOPIC_NUMS_LABEL_NAMES);
+ GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_producer_tps", "TopicPutNums", TOPIC_NUMS_LABEL_NAMES);
for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutNums.entrySet()) {
loadTopicNumsMetric(topicPutNumsGauge, entry);
}
mfs.add(topicPutNumsGauge);
- GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_topic_put_messagesize", "TopicPutMessageSize", TOPIC_NUMS_LABEL_NAMES);
+ GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_producer_message_size", "TopicPutMessageSize", TOPIC_NUMS_LABEL_NAMES);
for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutSize.entrySet()) {
loadTopicNumsMetric(topicPutSizeGauge, entry);
}
mfs.add(topicPutSizeGauge);
}
- private static final List<String> TOPIC_NUMS_LABEL_NAMES = Arrays.asList("cluster", "brokers", "topic");
+ private static final List<String> TOPIC_NUMS_LABEL_NAMES = Arrays.asList("cluster", "broker", "topic");
private void loadTopicNumsMetric(GaugeMetricFamily family, Map.Entry<TopicPutNumMetric, Double> entry) {
family.addMetric(
Arrays.asList(
entry.getKey().getClusterName(),
- entry.getKey().getBrokerNames(),
+ entry.getKey().getBrokerName(),
entry.getKey().getTopicName()
),
entry.getValue()
);
}
- public void addTopicOffsetMetric(String clusterName, String brokerNames, String topic, long lastUpdateTimestamp, double value) {
+ public void addTopicOffsetMetric(String clusterName, String brokerName, String topic, long lastUpdateTimestamp, double value) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- topicRetryOffset.put(new ProducerMetric(clusterName, brokerNames, topic, lastUpdateTimestamp), value);
+ topicRetryOffset.put(new ProducerMetric(clusterName, brokerName, topic, lastUpdateTimestamp), value);
} else if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
- topicDLQOffset.put(new DLQTopicOffsetMetric(clusterName, brokerNames, topic.replace(MixAll.DLQ_GROUP_TOPIC_PREFIX, ""), lastUpdateTimestamp), value);
+ topicDLQOffset.put(new DLQTopicOffsetMetric(clusterName, brokerName, topic.replace(MixAll.DLQ_GROUP_TOPIC_PREFIX, ""), lastUpdateTimestamp), value);
} else {
- topicOffset.put(new ProducerMetric(clusterName, brokerNames, topic, lastUpdateTimestamp), value);
+ topicOffset.put(new ProducerMetric(clusterName, brokerName, topic, lastUpdateTimestamp), value);
}
}
@@ -552,34 +552,36 @@
}
}
- public void addTopicPutNumsMetric(String cluster, String brokerNames, String brokerIP, String brokerHost,
- String topic, double value) {
- topicPutNums.put(new TopicPutNumMetric(cluster, brokerNames, brokerIP, brokerHost, topic), value);
+ public void addTopicPutNumsMetric(String cluster, String brokerName, String brokerIP, String topic, double value) {
+ topicPutNums.put(new TopicPutNumMetric(cluster, brokerName, brokerIP, topic), value);
}
- public void addTopicPutSizeMetric(String cluster, String brokerName, String brokerIP, String brokerHost,
- String topic, double value) {
- topicPutSize.put(new TopicPutNumMetric(cluster, brokerName, brokerIP, brokerHost, topic), value);
+ public void addTopicPutSizeMetric(String cluster, String brokerName, String brokerIP, String topic, double value) {
+ topicPutSize.put(new TopicPutNumMetric(cluster, brokerName, brokerIP, topic), value);
}
public void addGroupBrokerTotalOffsetMetric(String topic, String group, long value) {
- groupBrokerTotalOffset.put(new ConsumerMetric(topic, group), value);
+ //groupBrokerTotalOffset.put(new ConsumerMetric(topic, group), value);
}
public void addGroupConsumerTotalOffsetMetric(String topic, String group, long value) {
- groupConsumeTotalOffset.put(new ConsumerMetric(topic, group), value);
+ //groupConsumeTotalOffset.put(new ConsumerMetric(topic, group), value);
}
- public void addGroupConsumeTPSMetric(String topic, String group, double value) {
- groupConsumeTPS.put(new ConsumerMetric(topic, group), value);
+ public void addGroupConsumeTPSMetric(String clusterName, String brokerName, String topic, String group, double value) {
+ groupConsumeTPS.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
}
- public void addGroupGetNumsMetric(String topic, String group, double value) {
- groupGetNums.put(new ConsumerMetric(topic, group), value);
+ public void addGroupGetNumsMetric(String clusterName, String brokerName, String topic, String group, double value) {
+ groupGetNums.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
}
- public void addGroupGetSizeMetric(String topic, String group, double value) {
- groupGetSize.put(new ConsumerMetric(topic, group), value);
+ public void addGroupGetSizeMetric(String clusterName, String brokerName, String topic, String group, double value) {
+ groupGetSize.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
+ }
+
+ public void addSendBackNumsMetric(String clusterName, String brokerName, String topic, String group, double value) {
+ sendBackNums.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
}
public void addConsumerClientFailedMsgCountsMetric(String group, String topic, String clientAddr, String clientId, long value) {
@@ -607,16 +609,12 @@
}
- public void addSendBackNumsMetric(String topic, String group, double value) {
- sendBackNums.put(new ConsumerMetric(topic, group), value);
+ public void addBrokerPutNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
+ brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
}
- public void addBrokerPutNumsMetric(String clusterName, String brokerIP, String brokerHost, double value) {
- brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerHost), value);
- }
-
- public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerHost, double value) {
- brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerHost), value);
+ public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
+ brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
}
public void addBrokerRuntimeStatsMetric(BrokerRuntimeStats stats, String clusterName, String brokerAddress, String brokerHost) {
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
index 45caff2..a0927fb 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
@@ -19,12 +19,12 @@
public class BrokerMetric {
private String clusterName;
private String brokerIP;
- private String brokerHost;
+ private String brokerName;
- public BrokerMetric(String clusterName, String brokerIP, String brokerHost) {
+ public BrokerMetric(String clusterName, String brokerIP, String brokerName) {
this.clusterName = clusterName;
this.brokerIP = brokerIP;
- this.brokerHost = brokerHost;
+ this.brokerName = brokerName;
}
public String getClusterName() {
@@ -43,12 +43,12 @@
this.brokerIP = brokerIP;
}
- public String getBrokerHost() {
- return brokerHost;
+ public String getBrokerName() {
+ return brokerName;
}
- public void setBrokerHost(String brokerHost) {
- this.brokerHost = brokerHost;
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
}
@Override
@@ -72,6 +72,6 @@
@Override
public String toString() {
- return "ClusterName: " + clusterName + " brokerIP: " + brokerIP + " brokerHost: " + brokerHost;
+ return "ClusterName: " + clusterName + " brokerIP: " + brokerIP + " brokerHost: " + brokerName;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
index a0036a2..5a11467 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
@@ -17,10 +17,14 @@
package org.apache.rocketmq.exporter.model.metrics;
public class ConsumerMetric {
+ private String clusterName;
+ private String brokerName;
private String topicName;
private String consumerGroupName;
- public ConsumerMetric(String topicName, String consumerGroupName) {
+ public ConsumerMetric(String clusterName, String brokerName, String topicName, String consumerGroupName) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
this.topicName = topicName;
this.consumerGroupName = consumerGroupName;
}
@@ -41,6 +45,22 @@
this.consumerGroupName = consumerGroupName;
}
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ConsumerMetric)) {
@@ -48,13 +68,15 @@
}
ConsumerMetric other = (ConsumerMetric) obj;
- return other.topicName.equals(topicName) &&
- other.consumerGroupName.equals(consumerGroupName);
+ return other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
+ && other.topicName.equals(topicName) && other.consumerGroupName.equals(consumerGroupName);
}
@Override
public int hashCode() {
int hash = 1;
+ hash = 37 * hash + clusterName.hashCode();
+ hash = 37 * hash + brokerName.hashCode();
hash = 37 * hash + topicName.hashCode();
hash = 37 * hash + consumerGroupName.hashCode();
return hash;
@@ -62,6 +84,7 @@
@Override
public String toString() {
- return "topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName;
+ return "clusterName: " + clusterName + "brokerName: " + brokerName
+ + "topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java
index 77d22c1..745f67d 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/DLQTopicOffsetMetric.java
@@ -18,13 +18,13 @@
public class DLQTopicOffsetMetric {
private String clusterName;
- private String brokerNames;
+ private String brokerName;
private String group;
private long lastUpdateTimestamp;
- public DLQTopicOffsetMetric(String clusterName, String brokerNames, String group, long lastUpdateTimestamp) {
+ public DLQTopicOffsetMetric(String clusterName, String brokerName, String group, long lastUpdateTimestamp) {
this.clusterName = clusterName;
- this.brokerNames = brokerNames;
+ this.brokerName = brokerName;
this.group = group;
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
@@ -37,12 +37,12 @@
this.clusterName = clusterName;
}
- public String getBrokerNames() {
- return brokerNames;
+ public String getBrokerName() {
+ return brokerName;
}
- public void setBrokerNames(String brokerNames) {
- this.brokerNames = brokerNames;
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
}
public String getGroup() {
@@ -68,7 +68,7 @@
}
DLQTopicOffsetMetric other = (DLQTopicOffsetMetric) obj;
- return other.clusterName.equals(clusterName) &&
+ return other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName) &&
other.group.equals(group);
}
@@ -77,12 +77,13 @@
public int hashCode() {
int hash = 1;
hash = 37 * hash + clusterName.hashCode();
+ hash = 37 * hash + brokerName.hashCode();
hash = 37 * hash + group.hashCode();
return hash;
}
@Override
public String toString() {
- return "ClusterName: " + clusterName + " BrokerNames: " + brokerNames + " group: " + group;
+ return "ClusterName: " + clusterName + " BrokerNames: " + brokerName + " group: " + group;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
index b0fd565..05e2e97 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
@@ -19,7 +19,7 @@
//max offset of topic
public class ProducerMetric {
private String clusterName;
- private String brokerNames;
+ private String brokerName;
private String topicName;
private long lastUpdateTimestamp;
@@ -31,12 +31,12 @@
this.clusterName = clusterName;
}
- public String getBrokerNames() {
- return brokerNames;
+ public String getBrokerName() {
+ return brokerName;
}
- public void setBrokerNames(String brokerNames) {
- this.brokerNames = brokerNames;
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
}
public String getTopicName() {
@@ -55,9 +55,9 @@
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
- public ProducerMetric(String clusterName, String brokerNames, String topicName, long lastUpdateTimestamp) {
+ public ProducerMetric(String clusterName, String brokerName, String topicName, long lastUpdateTimestamp) {
this.clusterName = clusterName;
- this.brokerNames = brokerNames;
+ this.brokerName = brokerName;
this.topicName = topicName;
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
@@ -69,7 +69,7 @@
}
ProducerMetric other = (ProducerMetric) obj;
- return other.clusterName.equals(clusterName) &&
+ return other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName) &&
other.topicName.equals(topicName);
}
@@ -77,12 +77,13 @@
public int hashCode() {
int hash = 1;
hash = 37 * hash + clusterName.hashCode();
+ hash = 37 * hash + brokerName.hashCode();
hash = 37 * hash + topicName.hashCode();
return hash;
}
@Override
public String toString() {
- return "ClusterName: " + clusterName + " BrokerNames: " + brokerNames + " topicName: " + topicName;
+ return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java
index ed93655..db5f181 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/TopicPutNumMetric.java
@@ -18,16 +18,14 @@
public class TopicPutNumMetric {
private String clusterName;
- private String brokerNames;
+ private String brokerName;
private String brokerIP;
- private String brokerHost;
private String topicName;
- public TopicPutNumMetric(String clusterName, String brokerNames, String brokerIP, String brokerHost, String topicName) {
+ public TopicPutNumMetric(String clusterName, String brokerName, String brokerIP, String topicName) {
this.clusterName = clusterName;
- this.brokerNames = brokerNames;
+ this.brokerName = brokerName;
this.brokerIP = brokerIP;
- this.brokerHost = brokerHost;
this.topicName = topicName;
}
@@ -39,12 +37,12 @@
this.clusterName = clusterName;
}
- public String getBrokerNames() {
- return brokerNames;
+ public String getBrokerName() {
+ return brokerName;
}
- public void setBrokerNames(String brokerNames) {
- this.brokerNames = brokerNames;
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
}
public String getTopicName() {
@@ -63,14 +61,6 @@
this.brokerIP = brokerIP;
}
- public String getBrokerHost() {
- return brokerHost;
- }
-
- public void setBrokerHost(String brokerHost) {
- this.brokerHost = brokerHost;
- }
-
@Override
public boolean equals(Object obj) {
if (!(obj instanceof TopicPutNumMetric)) {
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index fdd486b..bc29744 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -178,7 +178,7 @@
Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
HashMap<String, Long> brokerOffsetMap = new HashMap<>();
- HashMap<String, Long> brokerUpdateTimestamp = new HashMap<>();
+ HashMap<String, Long> brokerUpdateTimestampMap = new HashMap<>();
for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
MessageQueue q = topicStatusEntry.getKey();
@@ -190,18 +190,18 @@
brokerOffsetMap.put(q.getBrokerName(), offset.getMaxOffset());
}
- if (brokerUpdateTimestamp.containsKey(q.getBrokerName())) {
- if (offset.getLastUpdateTimestamp() > brokerUpdateTimestamp.get(q.getBrokerName())) {
- brokerUpdateTimestamp.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
+ if (brokerUpdateTimestampMap.containsKey(q.getBrokerName())) {
+ if (offset.getLastUpdateTimestamp() > brokerUpdateTimestampMap.get(q.getBrokerName())) {
+ brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
}
} else {
- brokerUpdateTimestamp.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
+ brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
}
}
Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
- brokerUpdateTimestamp.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
+ brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
}
}
log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
@@ -311,7 +311,7 @@
String.valueOf(messageModel.ordinal()),
diff
);
- metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
+ //metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
}
Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStats.getOffsetTable().entrySet();
for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
@@ -324,8 +324,7 @@
totalConsumerOffset += offset.getConsumerOffset();
}
metricsService.getCollector().addGroupBrokerTotalOffsetMetric(topic, group, totalBrokerOffset);
- metricsService.getCollector().addGroupConsumerTotalOffsetMetric(topic, group, totalBrokerOffset);
-
+ metricsService.getCollector().addGroupConsumerTotalOffsetMetric(topic, group, totalConsumerOffset);
}
}
log.info("consumer offset collection task finished...." + (System.currentTimeMillis() - start));
@@ -383,7 +382,6 @@
bd.getCluster(),
bd.getBrokerName(),
brokerIP,
- "",
topic,
Utils.getFixedDouble(bsd.getStatsMinute().getTps())
);
@@ -404,7 +402,6 @@
bd.getCluster(),
bd.getBrokerName(),
brokerIP,
- "",
topic,
Utils.getFixedDouble(bsd.getStatsMinute().getTps())
);
@@ -441,6 +438,8 @@
//how many messages the consumer has get for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
metricsService.getCollector().addGroupGetNumsMetric(
+ bd.getCluster(),
+ bd.getBrokerName(),
topic,
group,
Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
@@ -457,6 +456,8 @@
//how many bytes the consumer has get for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
metricsService.getCollector().addGroupGetSizeMetric(
+ bd.getCluster(),
+ bd.getBrokerName(),
topic,
group,
Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
@@ -473,6 +474,8 @@
////how many re-send times the consumer did for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
metricsService.getCollector().addSendBackNumsMetric(
+ bd.getCluster(),
+ bd.getBrokerName(),
topic,
group,
Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
@@ -520,7 +523,7 @@
metricsService.getCollector().addBrokerPutNumsMetric(
clusterEntry.getValue().getCluster(),
brokerIP,
- "",
+ clusterEntry.getValue().getBrokerName(),
Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (Exception ex) {
log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
@@ -531,7 +534,7 @@
metricsService.getCollector().addBrokerGetNumsMetric(
clusterEntry.getValue().getCluster(),
brokerIP,
- "",
+ clusterEntry.getValue().getBrokerName(),
Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (Exception ex) {
log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);