[ISSUE #99] ignored rocketmq_group_get_latency_by_storetime for boardcast consumer

[ISSUE #100] ignored rocketmq_group_get_latency_by_storetime for boardcast consumer
diff --git a/.asf.yaml b/.asf.yaml
new file mode 100644
index 0000000..99596cf
--- /dev/null
+++ b/.asf.yaml
@@ -0,0 +1,6 @@
+notifications:
+  commits:      commits@rocketmq.apache.org
+  issues:       commits@rocketmq.apache.org
+  pullrequests: commits@rocketmq.apache.org
+  jobs:         commits@rocketmq.apache.org
+  discussions:  dev@rocketmq.apache.org
diff --git a/pom.xml b/pom.xml
index 0bee298..45393f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
     <description>Apache RocketMQ Prometheus Exporter</description>
 
     <properties>
-        <rocketmq.version>4.8.0</rocketmq.version>
+        <rocketmq.version>4.9.4</rocketmq.version>
         <docker.image.prefix>docker.io</docker.image.prefix>
     </properties>
 
@@ -41,6 +41,11 @@
             <version>${rocketmq.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-store</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.jooq</groupId>
             <artifactId>joor</artifactId>
             <version>0.9.6</version>
@@ -76,8 +81,12 @@
             <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <classifier>exec</classifier>
+                </configuration>
             </plugin>
             <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <version>2.17</version>
                 <executions>
diff --git a/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java b/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java
index 894c295..fc706c3 100644
--- a/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java
+++ b/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java
@@ -32,7 +32,8 @@
         SpringApplication.run(RocketMQExporterApplication.class, args);
     }
 
-    @Override public void run(String... args) throws Exception {
+    @Override
+    public void run(String... args) throws Exception {
         return;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index ae87541..d6741e1 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -20,7 +20,6 @@
 import com.google.common.cache.CacheBuilder;
 import io.prometheus.client.Collector;
 import io.prometheus.client.GaugeMetricFamily;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
 import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
@@ -28,7 +27,6 @@
 import org.apache.rocketmq.exporter.model.metrics.ConsumerMetric;
 import org.apache.rocketmq.exporter.model.metrics.ConsumerTopicDiffMetric;
 import org.apache.rocketmq.exporter.model.metrics.DLQTopicOffsetMetric;
-import org.apache.rocketmq.exporter.model.metrics.ProducerMetric;
 import org.apache.rocketmq.exporter.model.metrics.TopicPutNumMetric;
 import org.apache.rocketmq.exporter.model.metrics.brokerruntime.BrokerRuntimeMetric;
 import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeFailedMsgsMetric;
@@ -37,6 +35,8 @@
 import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeRTMetric;
 import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimePullRTMetric;
 import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimePullTPSMetric;
+import org.apache.rocketmq.exporter.model.metrics.producer.ProducerCountMetric;
+import org.apache.rocketmq.exporter.model.metrics.producer.ProducerMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +44,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class RMQMetricsCollector extends Collector {
 
@@ -52,6 +53,8 @@
     private Cache<ProducerMetric, Double> topicRetryOffset;
     //max offset of dlq consume queue
     private Cache<DLQTopicOffsetMetric, Double> topicDLQOffset;
+    // producer instance count
+    private Cache<ProducerCountMetric, Integer> producerCounts;
 
     //total put numbers for topics
     private Cache<TopicPutNumMetric, Double> topicPutNums;
@@ -139,6 +142,8 @@
     private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps600;
     private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps60;
     private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps10;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutLatency99;
+    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutLatency999;
 
     private Cache<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer;
 
@@ -175,8 +180,10 @@
         this.topicOffset = initCache(outOfTimeSeconds);
         this.topicRetryOffset = initCache(outOfTimeSeconds);
         this.topicDLQOffset = initCache(outOfTimeSeconds);
+        this.producerCounts = initCache(outOfTimeSeconds);
         this.topicPutNums = initCache(outOfTimeSeconds);
         this.topicPutSize = initCache(outOfTimeSeconds);
+
         this.consumerDiff = initCache(outOfTimeSeconds);
         this.consumerRetryDiff = initCache(outOfTimeSeconds);
         this.consumerDLQDiff = initCache(outOfTimeSeconds);
@@ -231,6 +238,8 @@
         this.brokerRuntimePutTps600 = initCache(outOfTimeSeconds);
         this.brokerRuntimePutTps60 = initCache(outOfTimeSeconds);
         this.brokerRuntimePutTps10 = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutLatency99 = initCache(outOfTimeSeconds);
+        this.brokerRuntimePutLatency999 = initCache(outOfTimeSeconds);
         this.brokerRuntimeDispatchMaxBuffer = initCache(outOfTimeSeconds);
         this.brokerRuntimePutMessageDistributeTimeMap10toMore = initCache(outOfTimeSeconds);
         this.brokerRuntimePutMessageDistributeTimeMap5to10s = initCache(outOfTimeSeconds);
@@ -315,11 +324,11 @@
 
 
     private static final List<String> TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
-        "cluster", "broker", "topic", "lastUpdateTimestamp"
+        "cluster", "broker", "topic"
     );
 
     private static final List<String> DLQ_TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
-        "cluster", "broker", "group", "lastUpdateTimestamp"
+        "cluster", "broker", "group"
     );
 
     private void loadTopicOffsetMetric(GaugeMetricFamily family, Map.Entry<ProducerMetric, Double> entry) {
@@ -327,12 +336,29 @@
             Arrays.asList(
                 entry.getKey().getClusterName(),
                 entry.getKey().getBrokerName(),
-                entry.getKey().getTopicName(),
-                String.valueOf(entry.getKey().getLastUpdateTimestamp())
+                entry.getKey().getTopicName()
             ),
             entry.getValue());
     }
 
+    private static final List<String> PRODUCER_GROUP_CLIENT_METRIC_LABEL_NAMES = Arrays.asList(
+            "cluster", "broker", "group"
+    );
+
+    private void collectProducerMetric(List<MetricFamilySamples> mfs) {
+        GaugeMetricFamily producerCount = new GaugeMetricFamily("rocketmq_producer_count", "producer instance counter", PRODUCER_GROUP_CLIENT_METRIC_LABEL_NAMES);
+        for (Map.Entry<ProducerCountMetric, Integer> entry : producerCounts.asMap().entrySet()) {
+            producerCount.addMetric(
+                    Arrays.asList(
+                            entry.getKey().getClusterName(),
+                            entry.getKey().getBrokerName(),
+                            entry.getKey().getGroup()
+                    ),
+                    entry.getValue().doubleValue());
+        }
+        mfs.add(producerCount);
+    }
+
     private void collectTopicOffsetMetric(List<MetricFamilySamples> mfs) {
         GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_producer_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
         for (Map.Entry<ProducerMetric, Double> entry : topicOffset.asMap().entrySet()) {
@@ -352,8 +378,7 @@
                 Arrays.asList(
                     entry.getKey().getClusterName(),
                     entry.getKey().getBrokerName(),
-                    entry.getKey().getGroup(),
-                    String.valueOf(entry.getKey().getLastUpdateTimestamp())
+                    entry.getKey().getGroup()
                 ),
                 entry.getValue());
         }
@@ -368,6 +393,8 @@
 
         collectConsumerMetric(mfs);
 
+        collectProducerMetric(mfs);
+
         collectTopicOffsetMetric(mfs);
 
         collectTopicNums(mfs);
@@ -645,6 +672,10 @@
         }
     }
 
+    public void addProducerCountMetric(String clusterName, String brokerName, String groupName, int value) {
+        producerCounts.put(new ProducerCountMetric(clusterName, brokerName, groupName), value);
+    }
+
     public void addGroupCountMetric(String group, String caddrs, String localaddrs, int count) {
         this.consumerCounts.put(new ConsumerCountMetric(group, caddrs, localaddrs), count);
     }
@@ -736,6 +767,18 @@
         addCommitLogDirCapacity(clusterName, brokerAddress, brokerHost, stats);
         addAllKindOfTps(clusterName, brokerAddress, brokerHost, stats);
 
+        brokerRuntimePutLatency99.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPutLatency99());
+
+        brokerRuntimePutLatency999.put(new BrokerRuntimeMetric(
+                clusterName, brokerAddress, brokerHost,
+                stats.getBrokerVersionDesc(),
+                stats.getBootTimestamp(),
+                stats.getBrokerVersion()), stats.getPutLatency999());
+
         brokerRuntimeMsgPutTotalTodayNow.put(new BrokerRuntimeMetric(
             clusterName, brokerAddress, brokerHost,
             stats.getBrokerVersionDesc(),
@@ -1016,6 +1059,10 @@
         String clusterName, String brokerAddress, String brokerHost,
         String brokerDes, long bootTimestamp, int brokerVersion,
         BrokerRuntimeStats stats) {
+        if (stats.getPutMessageDistributeTimeMap() == null || stats.getPutMessageDistributeTimeMap().isEmpty()) {
+            log.warn("WARN putMessageDistributeTime is null or empty");
+            return;
+        }
         brokerRuntimePutMessageDistributeTimeMap0ms.put(new BrokerRuntimeMetric(
             clusterName,
             brokerAddress, brokerHost,
@@ -1399,5 +1446,17 @@
             loadBrokerRuntimeStatsMetric(brokerRuntimeRemainHowManyDataToFlushF, entry);
         }
         mfs.add(brokerRuntimeRemainHowManyDataToFlushF);
+
+        GaugeMetricFamily brokerRuntimePutLatency99F = new GaugeMetricFamily("rocketmq_brokeruntime_put_latency_99", "brokerRuntimePutLatency99", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutLatency99.asMap().entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutLatency99F, entry);
+        }
+        mfs.add(brokerRuntimePutLatency99F);
+
+        GaugeMetricFamily brokerRuntimePutLatency999F = new GaugeMetricFamily("rocketmq_brokeruntime_put_latency_999", "brokerRuntimePutLatency999", BROKER_RUNTIME_METRIC_LABEL_NAMES);
+        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutLatency999.asMap().entrySet()) {
+            loadBrokerRuntimeStatsMetric(brokerRuntimePutLatency999F, entry);
+        }
+        mfs.add(brokerRuntimePutLatency999F);
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
index 95ffa04..238aa96 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/BrokerRuntimeStats.java
@@ -18,6 +18,8 @@
 
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.exporter.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -76,8 +78,11 @@
     private double putMessageAverageSize;
     private long putMessageSizeTotal;
     private long dispatchBehindBytes;
+    private double putLatency99;
+    private double putLatency999;
 
 
+    private final static Logger log = LoggerFactory.getLogger(BrokerRuntimeStats.class);
     public BrokerRuntimeStats(KVTable kvTable) {
         this.msgPutTotalTodayNow = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayNow"));
 
@@ -86,7 +91,7 @@
 
         loadTps(this.putTps, kvTable.getTable().get("putTps"));
         loadTps(this.getMissTps, kvTable.getTable().get("getMissTps"));
-        loadTps(this.getTransferedTps, kvTable.getTable().get("getTransferedTps"));
+        loadTps(this.getTransferedTps, kvTable.getTable().get("getTransferredTps"));
         loadTps(this.getTotalTps, kvTable.getTable().get("getTotalTps"));
         loadTps(this.getFoundTps, kvTable.getTable().get("getFoundTps"));
 
@@ -127,6 +132,8 @@
         this.putMessageSizeTotal = Long.parseLong(kvTable.getTable().get("putMessageSizeTotal"));
         this.sendThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueCapacity"));
         this.pullThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueCapacity"));
+        this.putLatency99 = Double.parseDouble(kvTable.getTable().getOrDefault("putLatency99", "-1"));
+        this.putLatency999 = Double.parseDouble(kvTable.getTable().getOrDefault("putLatency999", "-1"));
 
     }
 
@@ -153,10 +160,18 @@
     }
 
     private void loadPutMessageDistributeTime(String str) {
+        if ("null".equalsIgnoreCase(str)) {
+            log.warn("loadPutMessageDistributeTime WARN, value is null");
+            return;
+        }
         String[] arr = str.split(" ");
         String key = "", value = "";
         for (String ar : arr) {
             String[] tarr = ar.split(":");
+            if (tarr.length < 2) {
+                log.warn("loadPutMessageDistributeTime WARN, wrong value is {}, {}", ar, str);
+                continue;
+            }
             key = tarr[0].replace("[", "").replace("]", "");
             value = tarr[1];
             this.putMessageDistributeTimeMap.put(key, Integer.parseInt(value));
@@ -595,4 +610,20 @@
     public void setDispatchBehindBytes(long dispatchBehindBytes) {
         this.dispatchBehindBytes = dispatchBehindBytes;
     }
+
+    public double getPutLatency99() {
+        return putLatency99;
+    }
+
+    public void setPutLatency99(double putLatency99) {
+        this.putLatency99 = putLatency99;
+    }
+
+    public double getPutLatency999() {
+        return putLatency999;
+    }
+
+    public void setPutLatency999(double putLatency999) {
+        this.putLatency999 = putLatency999;
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerCountMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerCountMetric.java
new file mode 100644
index 0000000..fe0539b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerCountMetric.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.model.metrics.producer;
+
+public class ProducerCountMetric {
+    private String clusterName;
+    private String brokerName;
+    private String group;
+
+    public ProducerCountMetric(String clusterName, String brokerName, String group) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.group = group;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ProducerCountMetric)) {
+            return false;
+        }
+        ProducerCountMetric other = (ProducerCountMetric) obj;
+        return other.group.equals(this.group) && other.clusterName.equals(this.clusterName) && other.brokerName.equals(this.brokerName);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + group.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "group: " + group +
+                " brokerName: " + brokerName +
+                " clusterName: " + clusterName;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java b/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerMetric.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
rename to src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerMetric.java
index 05e2e97..8d5629b 100644
--- a/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
+++ b/src/main/java/org/apache/rocketmq/exporter/model/metrics/producer/ProducerMetric.java
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.exporter.model.metrics;
+package org.apache.rocketmq.exporter.model.metrics.producer;
 
 //max offset of topic
 public class ProducerMetric {
diff --git a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
index 8c85e35..784c17a 100644
--- a/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
+++ b/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
@@ -45,6 +45,7 @@
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
@@ -116,29 +117,40 @@
         defaultMQAdminExt.createAndUpdateTopicConfig(addr, config);
     }
 
-    @Override public void createAndUpdatePlainAccessConfig(String addr,
-        PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    @Override
+    public void createAndUpdatePlainAccessConfig(String addr,
+                                                 PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         //ignore
     }
 
-    @Override public void deletePlainAccessConfig(String addr,
-        String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    @Override
+    public void deletePlainAccessConfig(String addr,
+                                        String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         //ignore
     }
 
-    @Override public void updateGlobalWhiteAddrConfig(String addr,
-        String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    @Override
+    public void updateGlobalWhiteAddrConfig(String addr,
+                                            String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         //ignore
     }
 
-    @Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
-        String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    @Override
+    public void updateGlobalWhiteAddrConfig(final String addr, final String globalWhiteAddrs, String aclFileFullPath)throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException{
+        //ignore
+    }
+
+    @Override
+    public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
+            String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         //ignore
         return null;
     }
 
-    @Override public AclConfig examineBrokerClusterAclConfig(
-        String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    @Override
+    public AclConfig examineBrokerClusterAclConfig(
+            String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         //ignore
         return null;
     }
@@ -241,11 +253,22 @@
     }
 
     @Override
+    public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup, brokerAddr);
+    }
+
+    @Override
     public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic)
             throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         return defaultMQAdminExt.examineProducerConnectionInfo(producerGroup, topic);
     }
 
+    // add @4.9.4
+    @Override
+    public ProducerTableInfo getAllProducerInfo(String brokerAddr) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return this.defaultMQAdminExt.getAllProducerInfo(brokerAddr);
+    }
+
     @Override
     public List<String> getNameServerAddressList() {
         return defaultMQAdminExt.getNameServerAddressList();
@@ -259,6 +282,12 @@
     }
 
     @Override
+    public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+        // ignore
+        return 0;
+    }
+
+    @Override
     public void putKVConfig(String namespace, String key, String value) {
         defaultMQAdminExt.putKVConfig(namespace, key, value);
     }
@@ -283,15 +312,20 @@
     }
 
     @Override
-    public void deleteTopicInNameServer(Set<String> addrs, String topic)
+    public void deleteTopicInNameServer(Set<String> addrs, String topic, String clusterName)
             throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        defaultMQAdminExt.deleteTopicInNameServer(addrs, topic);
+        defaultMQAdminExt.deleteTopicInNameServer(addrs, topic, clusterName);
     }
 
     @Override
     public void deleteSubscriptionGroup(String addr, String groupName)
             throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        defaultMQAdminExt.deleteSubscriptionGroup(addr, groupName);
+        //ignore
+    }
+
+    @Override
+    public void deleteSubscriptionGroup(String addr, String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
     }
 
     @Override
@@ -358,6 +392,16 @@
     }
 
     @Override
+    public boolean deleteExpiredCommitLog(String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return false;
+    }
+
+    @Override
+    public boolean deleteExpiredCommitLogByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return false;
+    }
+
+    @Override
     public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack)
             throws RemotingException, MQClientException, InterruptedException {
         return defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
@@ -527,9 +571,18 @@
     }
 
     @Override
-    public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
-                                                        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
-        return defaultMQAdminExt.getAllTopicGroup(brokerAddr, timeoutMillis);
+    public SubscriptionGroupWrapper getUserSubscriptionGroup(String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return this.defaultMQAdminExt.getUserSubscriptionGroup(brokerAddr, timeoutMillis);
+    }
+
+    @Override
+    public TopicConfigSerializeWrapper getAllTopicConfig(String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return this.defaultMQAdminExt.getAllTopicConfig(brokerAddr, timeoutMillis);
+    }
+
+    @Override
+    public TopicConfigSerializeWrapper getUserTopicConfig(String brokerAddr, boolean specialTopic, long timeoutMillis) throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
+        return this.defaultMQAdminExt.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis);
     }
 
     @Override
@@ -542,29 +595,31 @@
     @Override
     public void updateNameServerConfig(Properties properties,
                                        List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
-
+        this.defaultMQAdminExt.updateNameServerConfig(properties, list);
     }
 
     @Override
     public Map<String, Properties> getNameServerConfig(
             List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
-        return null;
+        return this.defaultMQAdminExt.getNameServerConfig(list);
     }
 
     @Override
     public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
                                                            int queueId, long index, int count,
                                                            String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
-        return null;
+        return this.defaultMQAdminExt.queryConsumeQueue(brokerAddr, topic, queueId, index, count, consumerGroup);
     }
 
-    @Override public boolean resumeCheckHalfMessage(
-        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+    @Override
+    public boolean resumeCheckHalfMessage(
+            String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         return false;
     }
 
-    @Override public boolean resumeCheckHalfMessage(String topic,
-        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+    @Override
+    public boolean resumeCheckHalfMessage(String topic,
+                                          String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         return false;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index a880807..7f4b940 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -35,6 +35,8 @@
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.ProducerInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -62,8 +64,8 @@
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -201,6 +203,54 @@
         log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
     }
 
+    @Scheduled(cron = "${task.collectProducer.cron}")
+    public void collectProducer() {
+        if (!rmqConfigure.isEnableCollect()) {
+            return;
+        }
+        log.info("producer metric collection task starting....");
+        long start = System.currentTimeMillis();
+        ClusterInfo clusterInfo = null;
+        try {
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        } catch (Exception ex) {
+            log.error(String.format("collectProducer exception namesrv is %s",
+                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+            return;
+        }
+
+        if (clusterInfo == null || clusterInfo.getClusterAddrTable() == null || clusterInfo.getBrokerAddrTable() == null) {
+            log.warn(String.format("collectProducer get empty cluster, namesrv is: %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+            return;
+        }
+        for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
+            Set<String> brokerNames = clusterInfo.getClusterAddrTable().get(clusterName);
+            if (brokerNames == null || brokerNames.isEmpty()) {
+                log.warn(String.format("collectProducer cluster's brokers are empty, cluster=%s, name srv= %s", clusterName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+                continue;
+            }
+            for (String brokerName : brokerNames) {
+                BrokerData bd = clusterInfo.getBrokerAddrTable().get(brokerName);
+                ProducerTableInfo pt = null;
+                try {
+                    pt = mqAdminExt.getAllProducerInfo(bd.getBrokerAddrs().get(MixAll.MASTER_ID));
+                } catch (Exception e) {
+                    log.error(String.format("collectProducer. should not be here. cluster=%s, brokerName=%s, name srv= %s", clusterName, brokerName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+                }
+                if (pt == null || pt.getData() == null || pt.getData().isEmpty()) {
+                    log.warn(String.format("collectProducer. there are no producers in cluster=%s, brokerName=%s, name srv= %s", clusterName, brokerName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+                    continue;
+                }
+                for (String producerGroup : pt.getData().keySet()) {
+                    List<ProducerInfo> list = pt.getData().get(producerGroup);
+                    metricsService.getCollector().addProducerCountMetric(clusterName, brokerName, producerGroup, list == null ? -1 : list.size());
+                }
+            }
+        }
+
+        log.info("producer metric collection task ended....");
+    }
+
     @Scheduled(cron = "${task.collectConsumerOffset.cron}")
     public void collectConsumerOffset() {
         if (!rmqConfigure.isEnableCollect()) {
@@ -420,7 +470,7 @@
                             bd.getBrokerName(),
                             brokerIP,
                             topic,
-                            Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+                            Utils.getFixedDouble(bsd.getStatsMinute().getSum())
                         );
                     } catch (MQClientException ex) {
                         if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
@@ -440,7 +490,7 @@
                             bd.getBrokerName(),
                             brokerIP,
                             topic,
-                            Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+                            Utils.getFixedDouble(bsd.getStatsMinute().getSum())
                         );
                     } catch (MQClientException ex) {
                         if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
@@ -479,7 +529,7 @@
                                 bd.getBrokerName(),
                                 topic,
                                 group,
-                                Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                                Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
                         } catch (MQClientException ex) {
                             if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                                 //log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -497,7 +547,7 @@
                                 bd.getBrokerName(),
                                 topic,
                                 group,
-                                Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                                Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
                         } catch (MQClientException ex) {
                             if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                                 // log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -563,7 +613,7 @@
                     clusterName,
                     brokerIP,
                     brokerName,
-                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                    Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
             } catch (MQClientException ex) {
                 if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                     // log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 219bb2f..747b747 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -14,8 +14,8 @@
 rocketmq:
   config:
     webTelemetryPath: /metrics
-    rocketmqVersion: 4_8_0
     namesrvAddr: 127.0.0.1:9876
+    rocketmqVersion: 4_9_4
     enableCollect: true
     enableACL: false # if >=4.4.0
     accessKey:  # if >=4.4.0
@@ -32,6 +32,8 @@
   count: 5 # num of scheduled-tasks
   collectTopicOffset:
     cron: 15 0/1 * * * ?
+  collectProducer:
+    cron: 15 0/1 * * * ?
   collectConsumerOffset:
     cron: 15 0/1 * * * ?
   collectBrokerStatsTopic: