[ISSUE #99] ignored rocketmq_group_get_latency_by_storetime for boardcast consumer
[ISSUE #100] ignored rocketmq_group_get_latency_by_storetime for boardcast consumer
diff --git a/.asf.yaml b/.asf.yaml
new file mode 100644
index 0000000..99596cf
--- /dev/null
+++ b/.asf.yaml
@@ -0,0 +1,6 @@
+notifications:
+ commits: commits@rocketmq.apache.org
+ issues: commits@rocketmq.apache.org
+ pullrequests: commits@rocketmq.apache.org
+ jobs: commits@rocketmq.apache.org
+ discussions: dev@rocketmq.apache.org
diff --git a/pom.xml b/pom.xml
index 0bee298..45393f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
<description>Apache RocketMQ Prometheus Exporter</description>
<properties>
- <rocketmq.version>4.8.0</rocketmq.version>
+ <rocketmq.version>4.9.4</rocketmq.version>
<docker.image.prefix>docker.io</docker.image.prefix>
</properties>
@@ -41,6 +41,11 @@
<version>${rocketmq.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-store</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.jooq</groupId>
<artifactId>joor</artifactId>
<version>0.9.6</version>
@@ -76,8 +81,12 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+ <classifier>exec</classifier>
+ </configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<executions>
diff --git a/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java b/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java
index 894c295..fc706c3 100644
--- a/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java
+++ b/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java
@@ -32,7 +32,8 @@
SpringApplication.run(RocketMQExporterApplication.class, args);
}
- @Override public void run(String... args) throws Exception {
+ @Override
+ public void run(String... args) throws Exception {
return;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index ae87541..d6741e1 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -20,7 +20,6 @@
import com.google.common.cache.CacheBuilder;
import io.prometheus.client.Collector;
import io.prometheus.client.GaugeMetricFamily;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
@@ -28,7 +27,6 @@
import org.apache.rocketmq.exporter.model.metrics.ConsumerMetric;
import org.apache.rocketmq.exporter.model.metrics.ConsumerTopicDiffMetric;
import org.apache.rocketmq.exporter.model.metrics.DLQTopicOffsetMetric;
-import org.apache.rocketmq.exporter.model.metrics.ProducerMetric;
import org.apache.rocketmq.exporter.model.metrics.TopicPutNumMetric;
import org.apache.rocketmq.exporter.model.metrics.brokerruntime.BrokerRuntimeMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeFailedMsgsMetric;
@@ -37,6 +35,8 @@
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeRTMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimePullRTMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimePullTPSMetric;
+import org.apache.rocketmq.exporter.model.metrics.producer.ProducerCountMetric;
+import org.apache.rocketmq.exporter.model.metrics.producer.ProducerMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +44,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
public class RMQMetricsCollector extends Collector {
@@ -52,6 +53,8 @@
private Cache<ProducerMetric, Double> topicRetryOffset;
//max offset of dlq consume queue
private Cache<DLQTopicOffsetMetric, Double> topicDLQOffset;
+ // producer instance count
+ private Cache<ProducerCountMetric, Integer> producerCounts;
//total put numbers for topics
private Cache<TopicPutNumMetric, Double> topicPutNums;
@@ -139,6 +142,8 @@
private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps600;
private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps60;
private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps10;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutLatency99;
+ private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutLatency999;
private Cache<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer;
@@ -175,8 +180,10 @@
this.topicOffset = initCache(outOfTimeSeconds);
this.topicRetryOffset = initCache(outOfTimeSeconds);
this.topicDLQOffset = initCache(outOfTimeSeconds);
+ this.producerCounts = initCache(outOfTimeSeconds);
this.topicPutNums = initCache(outOfTimeSeconds);
this.topicPutSize = initCache(outOfTimeSeconds);
+
this.consumerDiff = initCache(outOfTimeSeconds);
this.consumerRetryDiff = initCache(outOfTimeSeconds);
this.consumerDLQDiff = initCache(outOfTimeSeconds);
@@ -231,6 +238,8 @@
this.brokerRuntimePutTps600 = initCache(outOfTimeSeconds);
this.brokerRuntimePutTps60 = initCache(outOfTimeSeconds);
this.brokerRuntimePutTps10 = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutLatency99 = initCache(outOfTimeSeconds);
+ this.brokerRuntimePutLatency999 = initCache(outOfTimeSeconds);
this.brokerRuntimeDispatchMaxBuffer = initCache(outOfTimeSeconds);
this.brokerRuntimePutMessageDistributeTimeMap10toMore = initCache(outOfTimeSeconds);
this.brokerRuntimePutMessageDistributeTimeMap5to10s = initCache(outOfTimeSeconds);
@@ -315,11 +324,11 @@
private static final List<String> TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
- "cluster", "broker", "topic", "lastUpdateTimestamp"
+ "cluster", "broker", "topic"
);
private static final List<String> DLQ_TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
- "cluster", "broker", "group", "lastUpdateTimestamp"
+ "cluster", "broker", "group"
);
private void loadTopicOffsetMetric(GaugeMetricFamily family, Map.Entry<ProducerMetric, Double> entry) {
@@ -327,12 +336,29 @@
Arrays.asList(
entry.getKey().getClusterName(),
entry.getKey().getBrokerName(),
- entry.getKey().getTopicName(),
- String.valueOf(entry.getKey().getLastUpdateTimestamp())
+ entry.getKey().getTopicName()
),
entry.getValue());
}
+ private static final List<String> PRODUCER_GROUP_CLIENT_METRIC_LABEL_NAMES = Arrays.asList(
+ "cluster", "broker", "group"
+ );
+
+ private void collectProducerMetric(List<MetricFamilySamples> mfs) {
+ GaugeMetricFamily producerCount = new GaugeMetricFamily("rocketmq_producer_count", "producer instance counter", PRODUCER_GROUP_CLIENT_METRIC_LABEL_NAMES);
+ for (Map.Entry<ProducerCountMetric, Integer> entry : producerCounts.asMap().entrySet()) {
+ producerCount.addMetric(
+ Arrays.asList(
+ entry.getKey().getClusterName(),
+ entry.getKey().getBrokerName(),
+ entry.getKey().getGroup()
+ ),
+ entry.getValue().doubleValue());
+ }
+ mfs.add(producerCount);
+ }
+
private void collectTopicOffsetMetric(List<MetricFamilySamples> mfs) {
GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_producer_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
for (Map.Entry<ProducerMetric, Double> entry : topicOffset.asMap().entrySet()) {
@@ -352,8 +378,7 @@
Arrays.asList(
entry.getKey().getClusterName(),
entry.getKey().getBrokerName(),
- entry.getKey().getGroup(),
- String.valueOf(entry.getKey().getLastUpdateTimestamp())
+ entry.getKey().getGroup()
),
entry.getValue());
}
@@ -368,6 +393,8 @@
collectConsumerMetric(mfs);
+ collectProducerMetric(mfs);
+
collectTopicOffsetMetric(mfs);
collectTopicNums(mfs);
@@ -645,6 +672,10 @@
}
}
+ public void addProducerCountMetric(String clusterName, String brokerName, String groupName, int value) {
+ producerCounts.put(new ProducerCountMetric(clusterName, brokerName, groupName), value);
+ }
+
public void addGroupCountMetric(String group, String caddrs, String localaddrs, int count) {
this.consumerCounts.put(new ConsumerCountMetric(group, caddrs, localaddrs), count);
}
@@ -736,6 +767,18 @@
addCommitLogDirCapacity(clusterName, brokerAddress, brokerHost, stats);
addAllKindOfTps(clusterName, brokerAddress, brokerHost, stats);
+ brokerRuntimePutLatency99.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutLatency99());
+
+ brokerRuntimePutLatency999.put(new BrokerRuntimeMetric(
+ clusterName, brokerAddress, brokerHost,
+ stats.getBrokerVersionDesc(),
+ stats.getBootTimestamp(),
+ stats.getBrokerVersion()), stats.getPutLatency999());
+
brokerRuntimeMsgPutTotalTodayNow.put(new BrokerRuntimeMetric(
clusterName, brokerAddress, brokerHost,
stats.getBrokerVersionDesc(),
@@ -1016,6 +1059,10 @@
String clusterName, String brokerAddress, String brokerHost,
String brokerDes, long bootTimestamp, int brokerVersion,
BrokerRuntimeStats stats) {
+ if (stats.getPutMessageDistributeTimeMap() == null || stats.getPutMessageDistributeTimeMap().isEmpty()) {
+ log.warn("WARN putMessageDistributeTime is null or empty");
+ return;
+ }
brokerRuntimePutMessageDistributeTimeMap0ms.put(new BrokerRuntimeMetric(
clusterName,
brokerAddress, brokerHost,
@@ -1399,5 +1446,17 @@
loadBrokerRuntimeStatsMetric(brokerRuntimeRemainHowManyDataToFlushF, entry);
}
mfs.add(brokerRuntimeRemainHowManyDataToFlushF);
+
+ GaugeMetricFamily brokerRuntimePutLatency99F = new GaugeMetricFamily("rocketmq_brokeruntime_put_latency_99", "brokerRuntimePutLatency99", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutLatency99.asMap().entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutLatency99F, entry);
+ }
+ mfs.add(brokerRuntimePutLatency99F);
+
+ GaugeMetricFamily brokerRuntimePutLatency999F = new GaugeMetricFamily("rocketmq_brokeruntime_put_latency_999", "brokerRuntimePutLatency999", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+ for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutLatency999.asMap().entrySet()) {
+ loadBrokerRuntimeStatsMetric(brokerRuntimePutLatency999F, entry);
+ }
+ mfs.add(brokerRuntimePutLatency999F);
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
index 95ffa04..238aa96 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
@@ -18,6 +18,8 @@
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.exporter.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -76,8 +78,11 @@
private double putMessageAverageSize;
private long putMessageSizeTotal;
private long dispatchBehindBytes;
+ private double putLatency99;
+ private double putLatency999;
+ private final static Logger log = LoggerFactory.getLogger(BrokerRuntimeStats.class);
public BrokerRuntimeStats(KVTable kvTable) {
this.msgPutTotalTodayNow = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayNow"));
@@ -86,7 +91,7 @@
loadTps(this.putTps, kvTable.getTable().get("putTps"));
loadTps(this.getMissTps, kvTable.getTable().get("getMissTps"));
- loadTps(this.getTransferedTps, kvTable.getTable().get("getTransferedTps"));
+ loadTps(this.getTransferedTps, kvTable.getTable().get("getTransferredTps"));
loadTps(this.getTotalTps, kvTable.getTable().get("getTotalTps"));
loadTps(this.getFoundTps, kvTable.getTable().get("getFoundTps"));
@@ -127,6 +132,8 @@
this.putMessageSizeTotal = Long.parseLong(kvTable.getTable().get("putMessageSizeTotal"));
this.sendThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueCapacity"));
this.pullThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueCapacity"));
+ this.putLatency99 = Double.parseDouble(kvTable.getTable().getOrDefault("putLatency99", "-1"));
+ this.putLatency999 = Double.parseDouble(kvTable.getTable().getOrDefault("putLatency999", "-1"));
}
@@ -153,10 +160,18 @@
}
private void loadPutMessageDistributeTime(String str) {
+ if ("null".equalsIgnoreCase(str)) {
+ log.warn("loadPutMessageDistributeTime WARN, value is null");
+ return;
+ }
String[] arr = str.split(" ");
String key = "", value = "";
for (String ar : arr) {
String[] tarr = ar.split(":");
+ if (tarr.length < 2) {
+ log.warn("loadPutMessageDistributeTime WARN, wrong value is {}, {}", ar, str);
+ continue;
+ }
key = tarr[0].replace("[", "").replace("]", "");
value = tarr[1];
this.putMessageDistributeTimeMap.put(key, Integer.parseInt(value));
@@ -595,4 +610,20 @@
public void setDispatchBehindBytes(long dispatchBehindBytes) {
this.dispatchBehindBytes = dispatchBehindBytes;
}
+
+ public double getPutLatency99() {
+ return putLatency99;
+ }
+
+ public void setPutLatency99(double putLatency99) {
+ this.putLatency99 = putLatency99;
+ }
+
+ public double getPutLatency999() {
+ return putLatency999;
+ }
+
+ public void setPutLatency999(double putLatency999) {
+ this.putLatency999 = putLatency999;
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerCountMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerCountMetric.java
new file mode 100644
index 0000000..fe0539b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerCountMetric.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.exporter.model.metrics.producer;
+
+public class ProducerCountMetric {
+ private String clusterName;
+ private String brokerName;
+ private String group;
+
+ public ProducerCountMetric(String clusterName, String brokerName, String group) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.group = group;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ProducerCountMetric)) {
+ return false;
+ }
+ ProducerCountMetric other = (ProducerCountMetric) obj;
+ return other.group.equals(this.group) && other.clusterName.equals(this.clusterName) && other.brokerName.equals(this.brokerName);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ hash = 37 * hash + group.hashCode();
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "group: " + group +
+ " brokerName: " + brokerName +
+ " clusterName: " + clusterName;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerMetric.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
rename to src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerMetric.java
index 05e2e97..8d5629b 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerMetric.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.exporter.model.metrics;
+package org.apache.rocketmq.exporter.model.metrics.producer;
//max offset of topic
public class ProducerMetric {
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
index 8c85e35..784c17a 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
@@ -45,6 +45,7 @@
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
@@ -116,29 +117,40 @@
defaultMQAdminExt.createAndUpdateTopicConfig(addr, config);
}
- @Override public void createAndUpdatePlainAccessConfig(String addr,
- PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ @Override
+ public void createAndUpdatePlainAccessConfig(String addr,
+ PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
//ignore
}
- @Override public void deletePlainAccessConfig(String addr,
- String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ @Override
+ public void deletePlainAccessConfig(String addr,
+ String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
//ignore
}
- @Override public void updateGlobalWhiteAddrConfig(String addr,
- String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ @Override
+ public void updateGlobalWhiteAddrConfig(String addr,
+ String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
//ignore
}
- @Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
- String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ @Override
+ public void updateGlobalWhiteAddrConfig(final String addr, final String globalWhiteAddrs, String aclFileFullPath)throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException{
+ //ignore
+ }
+
+ @Override
+ public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
+ String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
//ignore
return null;
}
- @Override public AclConfig examineBrokerClusterAclConfig(
- String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ @Override
+ public AclConfig examineBrokerClusterAclConfig(
+ String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
//ignore
return null;
}
@@ -241,11 +253,22 @@
}
@Override
+ public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ return defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup, brokerAddr);
+ }
+
+ @Override
public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return defaultMQAdminExt.examineProducerConnectionInfo(producerGroup, topic);
}
+ // add @4.9.4
+ @Override
+ public ProducerTableInfo getAllProducerInfo(String brokerAddr) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return this.defaultMQAdminExt.getAllProducerInfo(brokerAddr);
+ }
+
@Override
public List<String> getNameServerAddressList() {
return defaultMQAdminExt.getNameServerAddressList();
@@ -259,6 +282,12 @@
}
@Override
+ public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+ // ignore
+ return 0;
+ }
+
+ @Override
public void putKVConfig(String namespace, String key, String value) {
defaultMQAdminExt.putKVConfig(namespace, key, value);
}
@@ -283,15 +312,20 @@
}
@Override
- public void deleteTopicInNameServer(Set<String> addrs, String topic)
+ public void deleteTopicInNameServer(Set<String> addrs, String topic, String clusterName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- defaultMQAdminExt.deleteTopicInNameServer(addrs, topic);
+ defaultMQAdminExt.deleteTopicInNameServer(addrs, topic, clusterName);
}
@Override
public void deleteSubscriptionGroup(String addr, String groupName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- defaultMQAdminExt.deleteSubscriptionGroup(addr, groupName);
+ //ignore
+ }
+
+ @Override
+ public void deleteSubscriptionGroup(String addr, String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
}
@Override
@@ -358,6 +392,16 @@
}
@Override
+ public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return false;
+ }
+
+ @Override
+ public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return false;
+ }
+
+ @Override
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack)
throws RemotingException, MQClientException, InterruptedException {
return defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
@@ -527,9 +571,18 @@
}
@Override
- public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
- long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- return defaultMQAdminExt.getAllTopicGroup(brokerAddr, timeoutMillis);
+ public SubscriptionGroupWrapper getUserSubscriptionGroup(String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ return this.defaultMQAdminExt.getUserSubscriptionGroup(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public TopicConfigSerializeWrapper getAllTopicConfig(String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ return this.defaultMQAdminExt.getAllTopicConfig(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public TopicConfigSerializeWrapper getUserTopicConfig(String brokerAddr, boolean specialTopic, long timeoutMillis) throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
+ return this.defaultMQAdminExt.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis);
}
@Override
@@ -542,29 +595,31 @@
@Override
public void updateNameServerConfig(Properties properties,
List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
-
+ this.defaultMQAdminExt.updateNameServerConfig(properties, list);
}
@Override
public Map<String, Properties> getNameServerConfig(
List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
- return null;
+ return this.defaultMQAdminExt.getNameServerConfig(list);
}
@Override
public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
int queueId, long index, int count,
String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
- return null;
+ return this.defaultMQAdminExt.queryConsumeQueue(brokerAddr, topic, queueId, index, count, consumerGroup);
}
- @Override public boolean resumeCheckHalfMessage(
- String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ @Override
+ public boolean resumeCheckHalfMessage(
+ String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return false;
}
- @Override public boolean resumeCheckHalfMessage(String topic,
- String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ @Override
+ public boolean resumeCheckHalfMessage(String topic,
+ String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return false;
}
}
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index a880807..7f4b940 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -35,6 +35,8 @@
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.ProducerInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -62,8 +64,8 @@
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -201,6 +203,54 @@
log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
}
+ @Scheduled(cron = "${task.collectProducer.cron}")
+ public void collectProducer() {
+ if (!rmqConfigure.isEnableCollect()) {
+ return;
+ }
+ log.info("producer metric collection task starting....");
+ long start = System.currentTimeMillis();
+ ClusterInfo clusterInfo = null;
+ try {
+ clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ } catch (Exception ex) {
+ log.error(String.format("collectProducer exception namesrv is %s",
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ return;
+ }
+
+ if (clusterInfo == null || clusterInfo.getClusterAddrTable() == null || clusterInfo.getBrokerAddrTable() == null) {
+ log.warn(String.format("collectProducer get empty cluster, namesrv is: %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ return;
+ }
+ for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
+ Set<String> brokerNames = clusterInfo.getClusterAddrTable().get(clusterName);
+ if (brokerNames == null || brokerNames.isEmpty()) {
+ log.warn(String.format("collectProducer cluster's brokers are empty, cluster=%s, name srv= %s", clusterName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ continue;
+ }
+ for (String brokerName : brokerNames) {
+ BrokerData bd = clusterInfo.getBrokerAddrTable().get(brokerName);
+ ProducerTableInfo pt = null;
+ try {
+ pt = mqAdminExt.getAllProducerInfo(bd.getBrokerAddrs().get(MixAll.MASTER_ID));
+ } catch (Exception e) {
+ log.error(String.format("collectProducer. should not be here. cluster=%s, brokerName=%s, name srv= %s", clusterName, brokerName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ }
+ if (pt == null || pt.getData() == null || pt.getData().isEmpty()) {
+ log.warn(String.format("collectProducer. there are no producers in cluster=%s, brokerName=%s, name srv= %s", clusterName, brokerName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ continue;
+ }
+ for (String producerGroup : pt.getData().keySet()) {
+ List<ProducerInfo> list = pt.getData().get(producerGroup);
+ metricsService.getCollector().addProducerCountMetric(clusterName, brokerName, producerGroup, list == null ? -1 : list.size());
+ }
+ }
+ }
+
+ log.info("producer metric collection task ended....");
+ }
+
@Scheduled(cron = "${task.collectConsumerOffset.cron}")
public void collectConsumerOffset() {
if (!rmqConfigure.isEnableCollect()) {
@@ -420,7 +470,7 @@
bd.getBrokerName(),
brokerIP,
topic,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+ Utils.getFixedDouble(bsd.getStatsMinute().getSum())
);
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
@@ -440,7 +490,7 @@
bd.getBrokerName(),
brokerIP,
topic,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+ Utils.getFixedDouble(bsd.getStatsMinute().getSum())
);
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
@@ -479,7 +529,7 @@
bd.getBrokerName(),
topic,
group,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
//log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -497,7 +547,7 @@
bd.getBrokerName(),
topic,
group,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
// log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -563,7 +613,7 @@
clusterName,
brokerIP,
brokerName,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
// log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 219bb2f..747b747 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -14,8 +14,8 @@
rocketmq:
config:
webTelemetryPath: /metrics
- rocketmqVersion: 4_8_0
namesrvAddr: 127.0.0.1:9876
+ rocketmqVersion: 4_9_4
enableCollect: true
enableACL: false # if >=4.4.0
accessKey: # if >=4.4.0
@@ -32,6 +32,8 @@
count: 5 # num of scheduled-tasks
collectTopicOffset:
cron: 15 0/1 * * * ?
+ collectProducer:
+ cron: 15 0/1 * * * ?
collectConsumerOffset:
cron: 15 0/1 * * * ?
collectBrokerStatsTopic: