RocketMQ Exporter (#241)

* First commit
diff --git a/rocketmq-prometheus-exporter/.gitignore b/rocketmq-prometheus-exporter/.gitignore
new file mode 100644
index 0000000..06bb326
--- /dev/null
+++ b/rocketmq-prometheus-exporter/.gitignore
@@ -0,0 +1,13 @@
+.idea
+.classpath
+.project
+.settings/
+target/
+*.log*
+*.iml
+.idea/
+*.versionsBackup
+!NOTICE-BIN
+!LICENSE-BIN
+.DS_Store
+.vscode
diff --git a/rocketmq-prometheus-exporter/LICENSE b/rocketmq-prometheus-exporter/LICENSE
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/rocketmq-prometheus-exporter/LICENSE
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/rocketmq-prometheus-exporter/README.md b/rocketmq-prometheus-exporter/README.md
index a9dc2f8..268e445 100644
--- a/rocketmq-prometheus-exporter/README.md
+++ b/rocketmq-prometheus-exporter/README.md
@@ -1,5 +1,212 @@
-# RocketMQ-prometheus-exporter
+RocketMQ_exporter
+==============
 
-## Overview
+RocketMQ exporter for Prometheus.
+
+Table of Contents
+-----------------
+-	[Compatibility](#compatibility)
+-   [Dependency](#dependency)
+-   [Download](#download)
+-   [Compile](#compile)
+	-   [Build Binary](#build-binary)
+	-   [Build Docker Image](#build-docker-image)
+-   [Run](#run)
+	-   [Run Binary](#run-binary)
+	-   [Run Docker Image](#run-docker-image)
+-   [Flags](#flags)
+-   [Metrics](#metrics)
+	-   [Brokers](#brokers)
+	-   [Topics](#topics)
+	-   [Consumer Groups](#consumer-groups)
+-   [Contribute](#contribute)
+
+Compatibility
+-------------
+
+Support [Apache RocketMQ](https://rocketmq.apache.org) version 4.3.2 (and later).
+
+Dependency
+----------
+
+-	[Prometheus](https://prometheus.io)
+
+Download
+--------
+
+source code  can be downloaded from [github](https://github.com/hdchen/rocketmq-exporter ) page.
+
+Compile
+-------
+
+### Build Binary
+
+```shell
+mvn clean install
+```
+
+### Build Docker Image
+
+```shell
+mvn package -Dmaven.test.skip=true docker:build
+```
 
 
+It can be used directly instead of having to build the image yourself. ([Docker Hub breezecoolyang/rocketmq-exporter](https://cloud.docker.com/repository/docker/breezecoolyang/rocketmq-exporter)\)
+
+Run
+---
+
+### Run Binary
+
+```shell
+java -jar rocketmq-exporter-0.0.1-SNAPSHOT.jar [--rocketmq.config.namesrvAddr="127.0.0.1:9876" ...]
+```
+
+### Run Docker Image
+
+```
+docker container run -itd --rm  -p 5557:5557  breezecoolyang/rocketmq-exporter [--rocketmq.config.namesrvAddr="127.0.0.1:9876" ...]
+```
+
+Flags
+---
+
+This image is configurable using different flags
+
+|Flag name                           | Default            | Description                                        |
+| -----------------------------------|--------------------|----------------------------------------------------|
+| `rocketmq.config.namesrvAddr`      |  127.0.0.1:9876 |name server address  for  broker cluster            |
+| `rocketmq.config.webTelemetryPath` | /metrics           |Path under which to expose metrics                  |
+| `server.port`                      | 5557               |Address to listen on for web interface and telemetry|
+| `rocketmq.config.rocketmqVersion`  | V4_3_2             |rocketmq broker version                             |
+
+Metrics
+-------
+
+Documents about exposed Prometheus metrics.
+
+### Broker 
+
+**Metrics details**
+
+| Name         | Exposed information                                  |
+| ------------ | ---------------------------------------------------- |
+| `rocketmq_broker_tps` | total put message numbers per second for this broker |
+| `rocketmq_broker_qps` | total get message numbers per second for this broker |
+
+**Metrics output example**
+
+```txt
+# HELP rocketmq_broker_tps BrokerPutNums
+# TYPE rocketmq_broker_tps gauge
+rocketmq_broker_tps{cluster="MQCluster",broker="broker-a",} 7.933333333333334
+rocketmq_broker_tps{cluster="MQCluster",broker="broker-b",} 7.916666666666667
+# HELP rocketmq_broker_qps BrokerGetNums
+# TYPE rocketmq_broker_qps gauge
+rocketmq_broker_qps{cluster="MQCluster",broker="broker-a",} 8.2
+rocketmq_broker_qps{cluster="MQCluster",broker="broker-b",} 8.15
+```
+
+### Topics
+
+**Metrics details**
+
+| Name                | Exposed information                                |
+| ------------------- | -------------------------------------------------- |
+| `rocketmq_producer_tps`      | sending messages number per second  for this topic |
+| `rocketmq_producer_put_size` | sending messages size per second  for this topic   |
+| `rocketmq_producer_offset`   | Current Offset of a Broker for this topic          |
+
+**Metrics output example**
+
+```txt
+# HELP rocketmq_producer_tps TopicPutNums
+# TYPE rocketmq_producer_tps gauge
+rocketmq_producer_tps{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",} 7.933333333333334
+rocketmq_producer_tps{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",} 7.916666666666667
+# HELP rocketmq_producer_put_size TopicPutSize
+# TYPE rocketmq_producer_put_size gauge
+rocketmq_producer_put_size{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",} 1642.2
+rocketmq_producer_put_size{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",} 1638.75
+# HELP rocketmq_producer_offset TopicOffset
+# TYPE rocketmq_producer_offset counter
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="TBW102",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="DEV_TID_tfq",} 1878633.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="DEV_TID_tfq",} 3843787.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="DEV_TID_20190304",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="BenchmarkTest",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="DEV_TID_20190305",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="MQCluster",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",} 2798195.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="BenchmarkTest",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",} 1459666.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="MQCluster",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="SELF_TEST_TOPIC",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="OFFSET_MOVED_EVENT",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="broker-b",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="broker-a",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="SELF_TEST_TOPIC",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="RMQ_SYS_TRANS_HALF_TOPIC",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="DEV_TID_20190305",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="OFFSET_MOVED_EVENT",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="RMQ_SYS_TRANS_HALF_TOPIC",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-b",topic="TBW102",} 0.0
+rocketmq_producer_offset{cluster="MQCluster",broker="broker-a",topic="DEV_TID_20190304",} 0.0
+
+```
+
+### Consumer Groups
+
+**Metrics details**
+
+| Name                              | Exposed information                                          |
+| --------------------------------- | ------------------------------------------------------------ |
+| `rocketmq_consumer_tps`                    | consumer message numbers per second for this Topic           |
+| `rocketmq_consumer_get_size`               | consumer message size per second for this Topic              |
+| `rocketmq_consumer_offset`                 | consumer offset for this topic                               |
+| `rocketmq_group_get_latency`               | consumer latency on some topic for one queue                 |
+| `rocketmq_group_get_latency_by_storetime ` | consumer latency between message consume time and message store time on some topic |
+
+**Metrics output example**
+
+```txt
+# HELP rocketmq_consumer_tps GroupGetNums
+# TYPE rocketmq_consumer_tps gauge
+rocketmq_consumer_tps{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",} 7.916666666666667
+rocketmq_consumer_tps{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",} 7.933333333333334
+# HELP rocketmq_consumer_get_size GroupGetSize
+# TYPE rocketmq_consumer_get_size gauge
+rocketmq_consumer_get_size{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",} 1638.75
+rocketmq_consumer_get_size{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",} 1642.2
+# HELP rocketmq_consumer_offset GroupOffset
+# TYPE rocketmq_consumer_offset counter
+rocketmq_consumer_offset{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",} 1462030.0
+rocketmq_consumer_offset{cluster="MQCluster",broker="broker-a",topic="DEV_TID_tfq",group="DEV_CID_cfq",} 3843787.0
+rocketmq_consumer_offset{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",} 2800569.0
+rocketmq_consumer_offset{cluster="MQCluster",broker="broker-b",topic="DEV_TID_tfq",group="DEV_CID_cfq",} 1878633.0
+# HELP rocketmq_group_get_latency GroupGetLatency
+# TYPE rocketmq_group_get_latency gauge
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="0",} 0.05
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="1",} 0.0
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="7",} 0.05
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="6",} 0.016666666666666666
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="3",} 0.0
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="7",} 0.03333333333333333
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="4",} 0.0
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="5",} 0.03333333333333333
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="6",} 0.016666666666666666
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="2",} 0.0
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="3",} 0.0
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="0",} 0.0
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="4",} 0.0
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="1",} 0.03333333333333333
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="5",} 0.0
+rocketmq_group_get_latency{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",queueid="2",} 0.0
+# HELP rocketmq_group_get_latency_by_storetime GroupGetLatencyByStoreTime
+# TYPE rocketmq_group_get_latency_by_storetime gauge
+rocketmq_group_get_latency_by_storetime{cluster="MQCluster",broker="broker-b",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",} 3215.0
+rocketmq_group_get_latency_by_storetime{cluster="MQCluster",broker="broker-a",topic="DEV_TID_tfq",group="DEV_CID_cfq",} 0.0
+rocketmq_group_get_latency_by_storetime{cluster="MQCluster",broker="broker-a",topic="DEV_TID_topic_tfq",group="DEV_CID_consumer_cfq",} 3232.0
+rocketmq_group_get_latency_by_storetime{cluster="MQCluster",broker="broker-b",topic="DEV_TID_tfq",group="DEV_CID_cfq",} 0.0
+```
diff --git a/rocketmq-prometheus-exporter/example.rules b/rocketmq-prometheus-exporter/example.rules
new file mode 100644
index 0000000..e30c909
--- /dev/null
+++ b/rocketmq-prometheus-exporter/example.rules
@@ -0,0 +1,67 @@
+###
+# Sample prometheus rules/alerts for rocketmq.
+#
+# NOTE: Please review these carefully as thresholds and behavior may not meet
+#       your SLOs or labels.
+#
+
+###
+# Recording Rules
+
+
+###
+# Galera Alerts
+
+groups:
+- name: GaleraAlerts
+  rules:
+  - alert: RocketMQClusterProduceHigh
+    expr: sum(rocketmq_producer_tps) by (cluster) >= 10
+    for: 3m
+    labels:
+      severity: warning
+    annotations:
+      description: '{{$labels.cluster}} Sending tps too high.'
+      summary: cluster send tps too high
+  - alert: RocketMQClusterProduceLow
+    expr: sum(rocketmq_producer_tps) by (cluster) < 1
+    for: 3m
+    labels:
+      severity: warning
+    annotations:
+      description: '{{$labels.cluster}} Sending tps too low.'
+      summary: cluster send tps too low
+  - alert: RocketMQClusterConsumeHigh
+    expr: sum(rocketmq_consumer_tps) by (cluster) >= 10
+    for: 3m
+    labels:
+      severity: warning
+    annotations:
+      description: '{{$labels.cluster}} consuming tps too high.'
+      summary: cluster consume tps too high
+  - alert: RocketMQClusterConsumeLow
+    expr: sum(rocketmq_consumer_tps) by (cluster) < 1
+    for: 3m
+    labels:
+      severity: warning
+    annotations:
+      description: '{{$labels.cluster}} consuming tps too low.'
+      summary: cluster consume tps too low
+  - alert: ConsumerFallingBehind
+    expr: (sum(rocketmq_producer_offset) by (topic) - on(topic)  group_right  sum(rocketmq_consumer_offset) by (group,topic)) - ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)*5*60 > 0
+    for: 3m
+    labels:
+      severity: warning
+    annotations:
+      description: 'consumer {{$labels.group}} on {{$labels.topic}} lag behind
+        and is falling behind (behind value {{$value}}).'
+      summary: consumer lag behind
+  - alert: GroupGetLatencyByStoretime
+    expr: rocketmq_group_get_latency_by_storetime > 1000
+    for: 3m
+    labels:
+      severity: warning
+    annotations:
+      description: 'consumer {{$labels.group}} on {{$labels.broker}}, {{$labels.topic}} consume time lag behind message store time
+        and (behind value is {{$value}}).'
+      summary: message consumes time lag behind message store time too much 
diff --git a/rocketmq-prometheus-exporter/pom.xml b/rocketmq-prometheus-exporter/pom.xml
new file mode 100644
index 0000000..cda549d
--- /dev/null
+++ b/rocketmq-prometheus-exporter/pom.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.springframework.boot</groupId>
+		<artifactId>spring-boot-starter-parent</artifactId>
+		<version>2.1.2.RELEASE</version>
+		<relativePath/> <!-- lookup parent from repository -->
+	</parent>
+	<groupId>org.apache</groupId>
+	<artifactId>rocketmq-exporter</artifactId>
+	<version>0.0.1-SNAPSHOT</version>
+	<name>rocketmq-exporter</name>
+
+	<description>Demo project for Spring Boot</description>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<java.version>1.7</java.version>
+		<guava.version>16.0.1</guava.version>
+		<commons-digester.version>2.1</commons-digester.version>
+		<commons-lang.version>2.6</commons-lang.version>
+		<commons-io.version>2.4</commons-io.version>
+		<commons-cli.version>1.2</commons-cli.version>
+		<rocketmq.version>4.4.0</rocketmq.version>
+		<surefire.version>2.19.1</surefire.version>
+		<aspectj.version>1.8.9</aspectj.version>
+		<main.basedir>${basedir}/../..</main.basedir>
+		<docker.image.prefix>docker.io</docker.image.prefix>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-web</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <version>3.2.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-namesrv</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+		<dependency>
+			<groupId>org.aspectj</groupId>
+			<artifactId>aspectjrt</artifactId>
+			<version>${aspectj.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.aspectj</groupId>
+			<artifactId>aspectjweaver</artifactId>
+			<version>${aspectj.version}</version>
+		</dependency>
+        <dependency>
+            <groupId>org.jooq</groupId>
+            <artifactId>joor</artifactId>
+            <version>0.9.6</version>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient</artifactId>
+            <version>0.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_common</artifactId>
+            <version>0.6.0</version>
+        </dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.springframework.boot</groupId>
+				<artifactId>spring-boot-maven-plugin</artifactId>
+			</plugin>
+			<plugin>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<executions>
+					<execution>
+						<id>verify</id>
+						<phase>verify</phase>
+						<configuration>
+							<configLocation>style/rmq_checkstyle.xml</configLocation>
+							<encoding>UTF-8</encoding>
+							<consoleOutput>true</consoleOutput>
+							<failsOnError>true</failsOnError>
+							<includeTestSourceDirectory>false</includeTestSourceDirectory>
+						</configuration>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>com.spotify</groupId>
+				<artifactId>docker-maven-plugin</artifactId>
+				<version>0.4.11</version>
+				<configuration>
+					<imageName>${docker.image.prefix}/${project.artifactId}</imageName>
+					<dockerDirectory>src/main/docker</dockerDirectory>
+					<resources>
+						<resource>
+							<targetPath>/</targetPath>
+							<directory>${project.build.directory}</directory>
+							<include>${project.build.finalName}.jar</include>
+						</resource>
+					</resources>
+				</configuration>
+			</plugin>
+		</plugins>
+
+	</build>
+
+</project>
diff --git a/rocketmq-prometheus-exporter/src/main/docker/Dockerfile b/rocketmq-prometheus-exporter/src/main/docker/Dockerfile
new file mode 100644
index 0000000..27f209c
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/docker/Dockerfile
@@ -0,0 +1,5 @@
+FROM java:8
+MAINTAINER breeze
+ADD rocketmq-exporter-0.0.1-SNAPSHOT.jar demo.jar
+EXPOSE 5557
+ENTRYPOINT ["java","-jar","demo.jar"]
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java
new file mode 100644
index 0000000..0866aef
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/RocketMQExporterApplication.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.web.servlet.ServletComponentScan;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@EnableAutoConfiguration
+@SpringBootApplication
+@ServletComponentScan
+@EnableScheduling
+public class RocketMQExporterApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(RocketMQExporterApplication.class, args);
+    }
+}
+
+
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java
new file mode 100644
index 0000000..bc3c38e
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/aspect/admin/MQAdminAspect.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.aspect.admin;
+
+import java.lang.reflect.Method;
+import org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod;
+import org.apache.rocketmq.exporter.service.client.MQAdminInstance;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Aspect
+@Service
+public class MQAdminAspect {
+    private Logger logger = LoggerFactory.getLogger(MQAdminAspect.class);
+
+    public MQAdminAspect() {
+    }
+
+    @Pointcut("execution(* org.apache.rocketmq.exporter.service.client.MQAdminExtImpl..*(..))")
+    public void mQAdminMethodPointCut() {
+
+    }
+
+    @Pointcut("@annotation(org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod)")
+    public void multiMQAdminMethodPointCut() {
+
+    }
+
+    @Around(value = "mQAdminMethodPointCut() || multiMQAdminMethodPointCut()")
+    public Object aroundMQAdminMethod(ProceedingJoinPoint joinPoint) throws Throwable {
+        long start = System.currentTimeMillis();
+        Object obj = null;
+        try {
+            MethodSignature signature = (MethodSignature)joinPoint.getSignature();
+            Method method = signature.getMethod();
+            MultiMQAdminCmdMethod multiMQAdminCmdMethod = method.getAnnotation(MultiMQAdminCmdMethod.class);
+            if (multiMQAdminCmdMethod != null && multiMQAdminCmdMethod.timeoutMillis() > 0) {
+                MQAdminInstance.initMQAdminInstance(multiMQAdminCmdMethod.timeoutMillis());
+            }
+            else {
+                MQAdminInstance.initMQAdminInstance(0);
+            }
+            obj = joinPoint.proceed();
+        }
+        finally {
+            MQAdminInstance.destroyMQAdminInstance();
+            logger.debug("op=look method={} cost={}", joinPoint.getSignature().getName(), System.currentTimeMillis() - start);
+        }
+        return obj;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java
new file mode 100644
index 0000000..7953fb3
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/aspect/admin/annotation/MultiMQAdminCmdMethod.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.aspect.admin.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface MultiMQAdminCmdMethod {
+    long timeoutMillis() default 0;
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
new file mode 100644
index 0000000..dbaf4dd
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.collector;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.GaugeMetricFamily;
+import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
+import org.apache.rocketmq.exporter.model.metrics.ConsumerMetric;
+import org.apache.rocketmq.exporter.model.metrics.ConsumerQueueMetric;
+import org.apache.rocketmq.exporter.model.metrics.ProducerMetric;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RMQMetricsCollector extends Collector {
+
+    private ConcurrentHashMap<ProducerMetric, Double>   topicPutNums            = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<ProducerMetric, Double>   topicPutSize            = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<ProducerMetric, Double>   topicOffset              = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<BrokerMetric, Double>     brokerPutNums           = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<BrokerMetric, Double>     brokerGetNums           = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<ConsumerMetric, Double>   groupGetNums            = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<ConsumerMetric, Double>   groupGetSize            = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<ConsumerMetric, Double>       sendBackNums        = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<ConsumerMetric, Double>       groupOffset         = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<ConsumerQueueMetric, Double>  groupGetLatency     = new ConcurrentHashMap<>();
+
+    private ConcurrentHashMap<ConsumerMetric, Double>  groupGetLatencyByStoreTime     = new ConcurrentHashMap<>();
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+
+        GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_producer_tps", "TopicPutNums", Arrays.asList("cluster","broker","topic"));
+        for (Map.Entry<ProducerMetric,Double> entry:topicPutNums.entrySet()) {
+            topicPutNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+        }
+        mfs.add(topicPutNumsGauge);
+
+
+        GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_producer_put_size", "TopicPutSize", Arrays.asList("cluster","broker","topic"));
+        for (Map.Entry<ProducerMetric, Double> entry: topicPutSize.entrySet()) {
+            topicPutSizeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+        }
+        mfs.add(topicPutSizeGauge);
+
+
+        CounterMetricFamily topicOffsetGauge = new CounterMetricFamily("rocketmq_producer_offset", "TopicOffset", Arrays.asList("cluster","broker","topic"));
+        for (Map.Entry<ProducerMetric, Double> entry: topicOffset.entrySet()) {
+            topicOffsetGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName()), entry.getValue());
+        }
+        mfs.add(topicOffsetGauge);
+
+
+        GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_tps", "BrokerPutNums", Arrays.asList("cluster","broker"));
+        for (Map.Entry<BrokerMetric, Double> entry: brokerPutNums.entrySet()) {
+            brokerPutNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName()), entry.getValue());
+        }
+        mfs.add(brokerPutNumsGauge);
+
+
+        GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_qps", "BrokerGetNums", Arrays.asList("cluster","broker"));
+        for (Map.Entry<BrokerMetric, Double> entry: brokerGetNums.entrySet()) {
+            brokerGetNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName()), entry.getValue());
+        }
+        mfs.add(brokerGetNumsGauge);
+
+
+        GaugeMetricFamily groupGetNumsGauge = new GaugeMetricFamily("rocketmq_consumer_tps", "GroupGetNums", Arrays.asList("cluster","broker","topic","group"));
+        for (Map.Entry<ConsumerMetric, Double> entry: groupGetNums.entrySet()) {
+            groupGetNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        }
+
+        mfs.add(groupGetNumsGauge);
+
+
+        GaugeMetricFamily groupGetSizeGauge = new GaugeMetricFamily("rocketmq_consumer_get_size", "GroupGetSize", Arrays.asList("cluster","broker","topic","group"));
+        for (Map.Entry<ConsumerMetric, Double> entry: groupGetSize.entrySet()) {
+            groupGetSizeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        }
+        mfs.add(groupGetSizeGauge);
+
+        CounterMetricFamily groupOffsetGauge = new CounterMetricFamily("rocketmq_consumer_offset", "GroupOffset", Arrays.asList("cluster","broker","topic","group"));
+        for (Map.Entry<ConsumerMetric, Double> entry: groupOffset.entrySet()) {
+            groupOffsetGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        }
+        mfs.add(groupOffsetGauge);
+
+
+        GaugeMetricFamily sendBackNumsGauge = new GaugeMetricFamily("rocketmq_send_back_nums", "SendBackNums", Arrays.asList("cluster","broker","topic","group"));
+        for (Map.Entry<ConsumerMetric, Double> entry: sendBackNums.entrySet()) {
+            sendBackNumsGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        }
+        mfs.add(sendBackNumsGauge);
+
+
+        GaugeMetricFamily groupGetLatencyGauge = new GaugeMetricFamily("rocketmq_group_get_latency", "GroupGetLatency", Arrays.asList("cluster","broker","topic","group","queueid"));
+        for (Map.Entry<ConsumerQueueMetric, Double> entry: groupGetLatency.entrySet()) {
+            groupGetLatencyGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName(),entry.getKey().getQueueId()), entry.getValue());
+        }
+        mfs.add(groupGetLatencyGauge);
+
+        GaugeMetricFamily groupGetLatencyByStoretimeGauge = new GaugeMetricFamily("rocketmq_group_get_latency_by_storetime", "GroupGetLatencyByStoreTime", Arrays.asList("cluster","broker","topic","group"));
+        for (Map.Entry<ConsumerMetric, Double> entry: groupGetLatencyByStoreTime.entrySet()) {
+            groupGetLatencyByStoretimeGauge.addMetric(Arrays.asList(entry.getKey().getClusterName(),entry.getKey().getBrokerName(),entry.getKey().getTopicName(),entry.getKey().getConsumerGroupName()), entry.getValue());
+        }
+        mfs.add(groupGetLatencyByStoretimeGauge);
+
+        return mfs;
+    }
+    public void AddTopicPutNumsMetric(String clusterName, String brokerName, String topic,  double value)
+    {
+        topicPutNums.put(new ProducerMetric(clusterName,brokerName,topic),value);
+    }
+
+    public void AddTopicPutSizeMetric(String clusterName, String brokerName, String topic,  double value)
+    {
+        topicPutSize.put(new ProducerMetric(clusterName,brokerName,topic),value);
+    }
+
+    public void AddTopicOffsetMetric(String clusterName, String brokerName, String topic,  double value)
+    {
+        topicOffset.put(new ProducerMetric(clusterName,brokerName,topic),value);
+    }
+
+    public void AddBrokerPutNumsMetric(String clusterName, String brokerName,  double value)
+    {
+        brokerPutNums.put(new BrokerMetric(clusterName,brokerName),value);
+    }
+
+    public void AddBrokerGetNumsMetric(String clusterName, String brokerName,  double value)
+    {
+        brokerGetNums.put(new BrokerMetric(clusterName,brokerName),value);
+    }
+
+    public void AddGroupGetNumsMetric(String clusterName, String brokerName, String topic, String group,  double value)
+    {
+        groupGetNums.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    }
+
+    public void AddGroupGetSizeMetric(String clusterName, String brokerName, String topic, String group,  double value)
+    {
+        groupGetSize.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    }
+
+    public void AddGroupOffsetMetric(String clusterName, String brokerName, String topic, String group,  double value)
+    {
+        groupOffset.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    }
+
+
+    public void AddsendBackNumsMetric(String clusterName, String brokerName, String topic, String group,  double value)
+    {
+        sendBackNums.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    }
+
+    public void AddGroupGetLatencyMetric(String clusterName, String brokerName, String topic, String group, String queueId,double value) {
+
+        groupGetLatency.put(new ConsumerQueueMetric(clusterName,brokerName,topic,group,queueId),value);
+    }
+
+    public void AddGroupGetLatencyByStoreTimeMetric(String clusterName, String brokerName, String topic, String group,double value) {
+
+        groupGetLatencyByStoreTime.put(new ConsumerMetric(clusterName,brokerName,topic,group),value);
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
new file mode 100644
index 0000000..2b3956b
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/config/RMQConfigure.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.config;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+
+
+import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
+
+@Configuration
+@ConfigurationProperties(prefix = "rocketmq.config")
+public class RMQConfigure {
+
+    public static final String ROCKETMQ_CONFIG_WEB_TELEMETRY_PATH = "rocketmq.config.webTelemetryPath";
+    public static final String ROCKETMQ_CONFIG_ROCKETMQ_VERSION = "rocketmq.config.rocketmqVersion";
+
+    private Logger logger = LoggerFactory.getLogger(RMQConfigure.class);
+    //use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env
+    private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+    private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
+
+    private boolean enableCollect;
+
+    private volatile String webTelemetryPath = System.getProperty(ROCKETMQ_CONFIG_WEB_TELEMETRY_PATH, "/metrics");
+
+    private volatile String rocketmqVersion = System.getProperty(ROCKETMQ_CONFIG_ROCKETMQ_VERSION, "V4_3_2");
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        if (StringUtils.isNotBlank(namesrvAddr)) {
+            this.namesrvAddr = namesrvAddr;
+            System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
+            logger.info("setNameSrvAddrByProperty nameSrvAddr={}", namesrvAddr);
+        }
+    }
+
+    public String getIsVIPChannel() {
+        return isVIPChannel;
+    }
+
+    public void setIsVIPChannel(String isVIPChannel) {
+        if (StringUtils.isNotBlank(isVIPChannel)) {
+            this.isVIPChannel = isVIPChannel;
+            System.setProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, isVIPChannel);
+            logger.info("setIsVIPChannel isVIPChannel={}", isVIPChannel);
+        }
+    }
+    public boolean isEnableCollect() {
+        return enableCollect;
+    }
+    public void setEnableCollect(boolean enableCollect) {
+        this.enableCollect = enableCollect;
+    }
+
+    public void setWebTelemetryPath(String webTelemetryPath) {
+        if (StringUtils.isNotBlank(webTelemetryPath)) {
+            this.webTelemetryPath = webTelemetryPath;
+            System.setProperty(ROCKETMQ_CONFIG_WEB_TELEMETRY_PATH, webTelemetryPath);
+            logger.info("setWebTelemetryPath webTelemetryPath={}", webTelemetryPath);
+        }
+    }
+
+    public String getWebTelemetryPath() {
+        return webTelemetryPath;
+    }
+
+    public void setRocketmqVersion(String rocketmqVersion) {
+        if (StringUtils.isNotBlank(rocketmqVersion)) {
+            this.rocketmqVersion = rocketmqVersion;
+            System.setProperty(ROCKETMQ_CONFIG_ROCKETMQ_VERSION, rocketmqVersion);
+            logger.info("setRocketmqVersion rocketmqVersion={}", rocketmqVersion);
+        }
+    }
+
+    public String getRocketmqVersion() {
+        return rocketmqVersion;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java
new file mode 100644
index 0000000..4df69d1
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/controller/RMQMetricsController.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.controller;
+
+
+import org.apache.rocketmq.exporter.service.RMQMetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.StringWriter;
+
+@RestController
+@EnableAutoConfiguration
+public class RMQMetricsController {
+
+    private final static Logger log = LoggerFactory.getLogger(RMQMetricsController.class);
+
+    @Resource
+    RMQMetricsService metricsService;
+
+    @RequestMapping(value = "${rocketmq.config.webTelemetryPath}")
+    @ResponseBody
+    private void metrics(HttpServletResponse response) throws IOException {
+
+        StringWriter writer = new StringWriter();
+        metricsService.Metrics(writer);
+
+        response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
+        response.getOutputStream().print(writer.toString());
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java
new file mode 100644
index 0000000..ccf3fdc
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/exception/ServiceException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.exception;
+
+public class ServiceException extends RuntimeException {
+    private static final long serialVersionUID = 9213584003139969215L;
+    private int code;
+
+    public ServiceException(int code, String message) {
+        super(message);
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
new file mode 100644
index 0000000..c7a0727
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/BrokerMetric.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class BrokerMetric {
+
+    private  String   clusterName;
+    private  String   brokerName;
+
+
+    public void setClusterName(String cluster) {
+
+        clusterName = cluster;
+    }
+    public  String getClusterName() {
+
+        return clusterName;
+    }
+    void setBrokerName(String broker) {
+
+        brokerName = broker;
+    }
+
+    public String getBrokerName() {
+
+        return brokerName;
+    }
+
+    public BrokerMetric(String cluster, String broker) {
+        clusterName = cluster;
+        brokerName  =   broker;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof BrokerMetric)) {
+            return false;
+        }
+        BrokerMetric other = (BrokerMetric) obj;
+
+        return  other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + clusterName.hashCode();
+        hash = 37 * hash + brokerName.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterName: " + clusterName + " BrokerName: " + brokerName;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
new file mode 100644
index 0000000..9530fff
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerMetric.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class ConsumerMetric {
+
+    private  String   clusterName;
+    private  String   brokerName;
+    private  String   topicName;
+    private  String   consumerGroupName;
+
+    public void setClusterName(String cluster) {
+        clusterName = cluster;
+    }
+    public  String getClusterName() {
+        return clusterName;
+    }
+    void setBrokerName(String broker) {
+        brokerName = broker;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setTopicName(String topic) {
+        topicName = topic;
+    }
+    public String getTopicName() {
+        return topicName;
+    }
+    public String getConsumerGroupName() {
+        return consumerGroupName;
+    }
+
+    public void setConsumerGroupName(String consumerGroupName) {
+        this.consumerGroupName = consumerGroupName;
+    }
+
+    public ConsumerMetric(String cluster, String broker, String topic,String consumerGroup) {
+        clusterName = cluster;
+        brokerName  =   broker;
+        topicName   =   topic;
+        consumerGroupName   =   consumerGroup;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ConsumerMetric)) {
+            return false;
+        }
+        ConsumerMetric other = (ConsumerMetric) obj;
+
+        return  other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
+                && other.topicName.equals(topicName)  && other.consumerGroupName.equals(consumerGroupName);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + clusterName.hashCode();
+        hash = 37 * hash + brokerName.hashCode();
+        hash = 37 * hash + topicName.hashCode();
+        hash = 37 * hash + consumerGroupName.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java
new file mode 100644
index 0000000..a6453fc
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ConsumerQueueMetric.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class ConsumerQueueMetric {
+
+    private  String   clusterName;
+    private  String   brokerName;
+    private  String   topicName;
+    private  String   consumerGroupName;
+    private  String   queueId;
+
+    public void setClusterName(String cluster) {
+        clusterName = cluster;
+    }
+    public  String getClusterName() {
+        return clusterName;
+    }
+    void setBrokerName(String broker) {
+        brokerName = broker;
+    }
+    public String getBrokerName() {
+        return brokerName;
+    }
+    public void setTopicName(String topic) {
+        topicName = topic;
+    }
+    public String  getTopicName() {
+        return topicName;
+    }
+    public String getConsumerGroupName() {
+        return consumerGroupName;
+    }
+
+    public void setConsumerGroupName(String consumerGroupName) {
+        this.consumerGroupName = consumerGroupName;
+    }
+    public String getQueueId() {
+        return queueId;
+    }
+    public void setQueueId(String queueId) {
+        this.queueId = queueId;
+    }
+    public ConsumerQueueMetric(String cluster, String broker, String topic, String consumerGroup,String queue) {
+        clusterName = cluster;
+        brokerName  =   broker;
+        topicName   =   topic;
+        consumerGroupName   =   consumerGroup;
+        queueId             =   queue;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ConsumerQueueMetric)) {
+            return false;
+        }
+        ConsumerQueueMetric other = (ConsumerQueueMetric) obj;
+
+        return  other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
+                && other.topicName.equals(topicName)  && other.consumerGroupName.equals(consumerGroupName)
+                && other.queueId.equals(queueId);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + clusterName.hashCode();
+        hash = 37 * hash + brokerName.hashCode();
+        hash = 37 * hash + topicName.hashCode();
+        hash = 37 * hash + consumerGroupName.hashCode();
+        hash = 37 * hash + queueId.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName + " ConsumeGroupName: " + consumerGroupName  +  "queueId: " + queueId;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
new file mode 100644
index 0000000..72baa73
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/model/metrics/ProducerMetric.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.model.metrics;
+
+public class ProducerMetric {
+
+    private  String   clusterName;
+    private  String   brokerName;
+    private  String   topicName;
+
+    public void setClusterName(String cluster) {
+        clusterName = cluster;
+    }
+    public  String getClusterName() {
+        return clusterName;
+    }
+    void setBrokerName(String broker) {
+        brokerName = broker;
+    }
+    public String getBrokerName() {
+        return brokerName;
+    }
+    public void setTopicName(String topic) {
+        topicName = topic;
+    }
+    public String  getTopicName() {
+        return topicName;
+    }
+    public ProducerMetric(String cluster,String broker,String topic) {
+        clusterName = cluster;
+        brokerName  =   broker;
+        topicName   =   topic;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ProducerMetric)) {
+            return false;
+        }
+        ProducerMetric other = (ProducerMetric) obj;
+
+        return  other.clusterName.equals(clusterName) && other.brokerName.equals(brokerName)
+                && other.topicName.equals(topicName);
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 37 * hash + clusterName.hashCode();
+        hash = 37 * hash + brokerName.hashCode();
+        hash = 37 * hash + topicName.hashCode();
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterName: " + clusterName + " BrokerName: " + brokerName + " topicName: " + topicName;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java
new file mode 100644
index 0000000..50f5b0a
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/AbstractCommonService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.service;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractCommonService {
+    @Resource
+    protected MQAdminExt mqAdminExt;
+    protected final Set<String> changeToBrokerNameSet(HashMap<String, Set<String>> clusterAddrTable,
+        List<String> clusterNameList, List<String> brokerNameList) {
+        Set<String> finalBrokerNameList = Sets.newHashSet();
+        if (CollectionUtils.isNotEmpty(clusterNameList)) {
+            try {
+                for (String clusterName : clusterNameList) {
+                    finalBrokerNameList.addAll(clusterAddrTable.get(clusterName));
+                }
+            }
+            catch (Exception e) {
+                throw Throwables.propagate(e);
+            }
+        }
+        if (CollectionUtils.isNotEmpty(brokerNameList)) {
+            finalBrokerNameList.addAll(brokerNameList);
+        }
+        return finalBrokerNameList;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java
new file mode 100644
index 0000000..3b2e403
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/RMQMetricsService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.service;
+
+import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
+
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+
+public interface RMQMetricsService  {
+    public RMQMetricsCollector getCollector();
+    public void Metrics(StringWriter writer) throws IOException;
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
new file mode 100644
index 0000000..17ec7d5
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminExtImpl.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.service.client;
+
+import com.google.common.base.Throwables;
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.RollbackStats;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.exporter.util.JsonUtil;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.joor.Reflect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
+
+@Service
+public class MQAdminExtImpl implements MQAdminExt {
+    private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
+
+    public MQAdminExtImpl() {
+    }
+
+
+    public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {
+        return MQAdminInstance.threadLocalMQPullConsumer().pull(mq, "*", offset, 1);
+    }
+
+    @Override
+    public void updateBrokerConfig(String brokerAddr, Properties properties)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties);
+    }
+
+    @Override
+    public void createAndUpdateTopicConfig(String addr, TopicConfig config)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
+    }
+
+    @Override
+    public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config);
+    }
+
+    @Override
+    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
+        RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
+        RemotingCommand response = null;
+        try {
+            response = remotingClient.invokeSync(addr, request, 3000);
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                SubscriptionGroupWrapper subscriptionGroupWrapper = decode(response.getBody(), SubscriptionGroupWrapper.class);
+                return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group);
+            }
+            default:
+                throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
+        }
+    }
+
+    @Override
+    public TopicConfig examineTopicConfig(String addr, String topic) {
+        RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+        RemotingCommand response = null;
+        try {
+            response = remotingClient.invokeSync(addr, request, 3000);
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class);
+                return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
+            }
+            default:
+                throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
+        }
+    }
+
+    @Override
+    public TopicStatsTable examineTopicStats(String topic)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic);
+    }
+
+    @Override
+    public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+        TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList();
+        logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList()));
+        return topicList;
+    }
+
+    @Override
+    public KVTable fetchBrokerRuntimeStats(String brokerAddr)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup, String topic)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic);
+    }
+
+    @Override
+    public ClusterInfo examineBrokerClusterInfo()
+        throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo();
+    }
+
+    @Override
+    public TopicRouteData examineTopicRouteInfo(String topic)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic);
+    }
+
+    @Override
+    public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup);
+    }
+
+    @Override
+    public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup, topic);
+    }
+
+    @Override
+    public List<String> getNameServerAddressList() {
+        return MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList();
+    }
+
+    @Override
+    public int wipeWritePermOfBroker(String namesrvAddr, String brokerName)
+        throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, brokerName);
+    }
+
+    @Override
+    public void putKVConfig(String namespace, String key, String value) {
+        MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, value);
+    }
+
+    @Override
+    public String getKVConfig(String namespace, String key)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, key);
+    }
+
+    @Override
+    public KVTable getKVListByNamespace(String namespace)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace);
+    }
+
+    @Override
+    public void deleteTopicInBroker(Set<String> addrs, String topic)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic);
+        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, topic);
+    }
+
+    @Override
+    public void deleteTopicInNameServer(Set<String> addrs, String topic)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, topic);
+    }
+
+    @Override
+    public void deleteSubscriptionGroup(String addr, String groupName)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName);
+    }
+
+    @Override
+    public void createAndUpdateKvConfig(String namespace, String key, String value)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, value);
+    }
+
+    @Override
+    public void deleteKvConfig(String namespace, String key)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key);
+    }
+
+    @Override
+    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
+        boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
+    }
+
+    @Override
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp,
+        boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce);
+    }
+
+    @Override
+    public void resetOffsetNew(String consumerGroup, String topic, long timestamp)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, topic, timestamp);
+    }
+
+    @Override
+    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
+        String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, group, clientAddr);
+    }
+
+    @Override
+    public void createOrUpdateOrderConf(String key, String value, boolean isCluster)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, value, isCluster);
+    }
+
+    @Override
+    public GroupList queryTopicConsumeByWho(String topic)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic);
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueue(String cluster)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+        InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster);
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueueByAddr(String addr)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+        InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr);
+    }
+
+    @Override
+    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack);
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
+        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId);
+    }
+
+    @Override
+    public List<MessageTrack> messageTrackDetail(MessageExt msg)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg);
+    }
+
+    @Override
+    public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
+        throws MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, timestamp);
+    }
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq);
+    }
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq);
+    }
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq);
+    }
+
+    @Override
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId);
+    }
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+        throws MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, key, maxNum, begin, end);
+    }
+
+    @Override
+    @Deprecated
+    public void start() throws MQClientException {
+        throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
+    }
+
+    @Override
+    @Deprecated
+    public void shutdown() {
+        throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
+    }
+
+    // below is 3.2.6->3.5.8 updated
+
+    @Override
+    public List<QueueTimeSpan> queryConsumeTimeSpan(String topic,
+        String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group);
+    }
+
+    //MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day
+    //next version we will remove it
+    //https://issues.apache.org/jira/browse/ROCKETMQ-111
+    //https://github.com/apache/incubator-rocketmq/pull/69
+    @Override
+    public MessageExt viewMessage(String topic,
+        String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
+        try {
+            return viewMessage(msgId);
+        }
+        catch (Exception e) {
+        }
+        MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
+        QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32,
+            MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
+        if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
+            return qr.getMessageList().get(0);
+        }
+        else {
+            return null;
+        }
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic,
+        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+    }
+
+    @Override
+    public Properties getBrokerConfig(
+        String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr);
+    }
+
+    @Override
+    public TopicList fetchTopicsByCLuster(
+        String clusterName) throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName);
+    }
+
+    @Override
+    public boolean cleanUnusedTopic(
+        String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster);
+    }
+
+    @Override
+    public boolean cleanUnusedTopicByAddr(
+        String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr);
+    }
+
+    @Override
+    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
+        String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, statsName, statsKey);
+    }
+
+    @Override
+    public Set<String> getClusterList(
+        String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic);
+    }
+
+    @Override
+    public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
+        long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+    }
+
+    @Override
+    public Set<String> getTopicClusterList(
+        String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
+        return MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
+    }
+
+    @Override
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+    }
+
+    @Override
+    public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, timeoutMillis);
+    }
+
+    @Override
+    public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
+        long offset) throws RemotingException, InterruptedException, MQBrokerException {
+        MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
+    }
+
+    // 4.0.0 added
+    @Override public void updateNameServerConfig(Properties properties,
+        List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
+
+    }
+
+    @Override public Map<String, Properties> getNameServerConfig(
+        List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
+        return null;
+    }
+
+    @Override public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic,
+        int queueId, long index, int count,
+        String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+        return null;
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java
new file mode 100644
index 0000000..6994c23
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/client/MQAdminInstance.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.service.client;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.joor.Reflect;
+
+import static org.apache.rocketmq.common.MixAll.TOOLS_CONSUMER_GROUP;
+
+
+public class MQAdminInstance {
+
+    private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>();
+
+    private static final ThreadLocal<DefaultMQPullConsumer> MQ_PULL_CONSUMER_THREAD_LOCAL = new ThreadLocal<DefaultMQPullConsumer>();
+
+    private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>();
+
+    public static MQAdminExt threadLocalMQAdminExt() {
+        DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
+        if (defaultMQAdminExt == null) {
+            throw new IllegalStateException("defaultMQAdminExt should be init before you get this");
+        }
+        return defaultMQAdminExt;
+    }
+
+
+    public static DefaultMQPullConsumer threadLocalMQPullConsumer() {
+        DefaultMQPullConsumer pullConsumer = MQ_PULL_CONSUMER_THREAD_LOCAL.get();
+        if (pullConsumer == null) {
+            throw new IllegalStateException("pullConsumer should be init before you get this");
+        }
+        return pullConsumer;
+    }
+
+
+    public static RemotingClient threadLocalRemotingClient() {
+        MQClientInstance mqClientInstance = threadLocalMqClientInstance();
+        MQClientAPIImpl mQClientAPIImpl = Reflect.on(mqClientInstance).get("mQClientAPIImpl");
+        return Reflect.on(mQClientAPIImpl).get("remotingClient");
+    }
+
+    public static MQClientInstance threadLocalMqClientInstance() {
+        DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
+        return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
+    }
+
+    public static void initMQAdminInstance(long timeoutMillis) throws MQClientException {
+        Integer nowCount = INIT_COUNTER.get();
+        if (nowCount == null) {
+            DefaultMQAdminExt defaultMQAdminExt;
+            if (timeoutMillis > 0) {
+                defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
+            }
+            else {
+                defaultMQAdminExt = new DefaultMQAdminExt();
+            }
+            defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
+            defaultMQAdminExt.start();
+            MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);
+
+
+            DefaultMQPullConsumer   pullConsumer;
+            pullConsumer    =   new DefaultMQPullConsumer(TOOLS_CONSUMER_GROUP,null);
+            pullConsumer.setInstanceName("consumer-" + Long.toString(System.currentTimeMillis()));
+            pullConsumer.setNamesrvAddr(System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)));
+            pullConsumer.start();
+            pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
+
+            MQ_PULL_CONSUMER_THREAD_LOCAL.set(pullConsumer);
+            INIT_COUNTER.set(1);
+        }
+        else {
+            INIT_COUNTER.set(nowCount + 1);
+        }
+
+    }
+
+    public static void destroyMQAdminInstance() {
+        Integer nowCount = INIT_COUNTER.get() - 1;
+        if (nowCount > 0) {
+            INIT_COUNTER.set(nowCount);
+            return;
+        }
+        MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
+        if (mqAdminExt != null) {
+            DefaultMQPullConsumer consumer = MQ_PULL_CONSUMER_THREAD_LOCAL.get();
+            if (consumer != null) {
+                consumer.shutdown();
+                MQ_PULL_CONSUMER_THREAD_LOCAL.remove();
+            }
+            mqAdminExt.shutdown();
+            MQ_ADMIN_EXT_THREAD_LOCAL.remove();
+            INIT_COUNTER.remove();
+        }
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
new file mode 100644
index 0000000..5dd008b
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/service/impl/RMQMetricsServiceImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.service.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.rocketmq.exporter.collector.RMQMetricsCollector;
+import org.apache.rocketmq.exporter.service.AbstractCommonService;
+import org.apache.rocketmq.exporter.service.RMQMetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+@Service
+public class RMQMetricsServiceImpl extends AbstractCommonService implements RMQMetricsService {
+
+    private Logger logger = LoggerFactory.getLogger(RMQMetricsServiceImpl.class);
+
+    private  CollectorRegistry registry = new CollectorRegistry();
+
+    private final RMQMetricsCollector rmqMetricsCollector;
+
+
+    public RMQMetricsCollector getCollector() {
+        return rmqMetricsCollector;
+    }
+
+    public RMQMetricsServiceImpl() {
+        rmqMetricsCollector = new RMQMetricsCollector();
+        rmqMetricsCollector.register(registry);
+    }
+    public void Metrics(StringWriter writer) throws IOException {
+        TextFormat.write004(writer, registry.metricFamilySamples());
+        logger.info(writer.toString());
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
new file mode 100644
index 0000000..504ac88
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.exporter.task;
+
+import com.google.common.base.Throwables;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.exporter.aspect.admin.annotation.MultiMQAdminCmdMethod;
+import org.apache.rocketmq.exporter.config.RMQConfigure;
+import org.apache.rocketmq.exporter.service.RMQMetricsService;
+import org.apache.rocketmq.exporter.service.client.MQAdminExtImpl;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+@Component
+public class MetricsCollectTask {
+
+    @Resource
+    private MQAdminExt mqAdminExt;
+    @Resource
+    private RMQConfigure rmqConfigure;
+
+    @Resource
+    private RMQMetricsService  metricsService;
+
+    private final static Logger log = LoggerFactory.getLogger(MetricsCollectTask.class);
+
+    @Scheduled(cron = "15 0/1 * * * ?")
+    @MultiMQAdminCmdMethod(timeoutMillis = 5000)
+    public void collectOffset() {
+        if (!rmqConfigure.isEnableCollect()) {
+            return;
+        }
+        Date date = new Date();
+        try {
+            TopicList topicList = mqAdminExt.fetchAllTopicList();
+            Set<String> topicSet = topicList.getTopicList();
+            for (String topic : topicSet) {
+                if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+                    continue;
+                }
+                String clusterName = null;
+                ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+                Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
+                for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
+                    clusterName  = clusterEntry.getValue().getCluster();
+                    break;
+                }
+                if (clusterName != null) {
+                    HashMap<String,Long>    brokerOffsetMap = new HashMap<>();
+                    TopicStatsTable topicStatus = mqAdminExt.examineTopicStats(topic);
+                    Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStatus.getOffsetTable().entrySet();
+                    for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
+                        MessageQueue q      =   topicStatusEntry.getKey();
+                        TopicOffset offset  =   topicStatusEntry.getValue();
+                        if  (brokerOffsetMap.containsKey(q.getBrokerName())) {
+                            brokerOffsetMap.put(q.getBrokerName(),brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
+                        }
+                        else {
+                            brokerOffsetMap.put(q.getBrokerName(),offset.getMaxOffset());
+                        }
+                    }
+                    Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
+                    for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
+                        metricsService.getCollector().AddTopicOffsetMetric(clusterName,brokerOffsetEntry.getKey(), topic, brokerOffsetEntry.getValue());
+                    }
+                }
+
+                HashMap<String,Long>    consumeOffsetMap = new HashMap<>();
+                GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+                if (groupList != null && !groupList.getGroupList().isEmpty()) {
+                    for (String group : groupList.getGroupList()) {
+                        ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group,topic);
+                        Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet();
+                        for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
+                            MessageQueue q          =   consumeStatusEntry.getKey();
+                            OffsetWrapper offset    =   consumeStatusEntry.getValue();
+                            if (consumeOffsetMap.containsKey(q.getBrokerName())) {
+                                consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset());
+                            }
+                            else {
+                                consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
+                            }
+                        }
+                        Set<Map.Entry<String, Long>> consumeOffsetEntries = consumeOffsetMap.entrySet();
+                        for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetEntries) {
+                            metricsService.getCollector().AddGroupOffsetMetric(clusterName,consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.info("error is " + e.getMessage());
+        }
+    }
+
+    @Scheduled(cron = "15 0/1 * * * ?")
+    @MultiMQAdminCmdMethod(timeoutMillis = 5000)
+    public void collectTopic() {
+        if (!rmqConfigure.isEnableCollect()) {
+            return;
+        }
+        Date date = new Date();
+        try {
+            TopicList topicList = mqAdminExt.fetchAllTopicList();
+            Set<String> topicSet = topicList.getTopicList();
+            for (String topic : topicSet) {
+                if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+                    continue;
+                }
+                TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
+                GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+
+                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
+                    if (masterAddr != null) {
+                        try {
+                            BrokerStatsData bsd = null;
+                            try {
+                                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
+                                metricsService.getCollector().AddTopicPutNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, bsd.getStatsMinute().getTps());
+                            }
+                            catch (Exception e) {
+                                log.info("error is " + e.getMessage());
+                            }
+                            try {
+                                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
+                                metricsService.getCollector().AddTopicPutSizeMetric(bd.getCluster(), bd.getBrokerName(), topic, bsd.getStatsMinute().getTps());
+                            }
+                            catch (Exception e) {
+                                log.info("error is " + e.getMessage());
+                            }
+                        } catch (Exception e) {
+                            log.info("error is " + e.getMessage());
+                        }
+                    }
+                }
+                if (groupList != null && !groupList.getGroupList().isEmpty()) {
+                    for (String group : groupList.getGroupList()) {
+                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+                            String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
+                            if (masterAddr != null) {
+                                try {
+                                    String statsKey = String.format("%s@%s", topic, group);
+                                    BrokerStatsData bsd = null;
+                                    try {
+                                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
+                                        metricsService.getCollector().AddGroupGetNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, group, bsd.getStatsMinute().getTps());
+                                    } catch (Exception e) {
+                                        log.info("error is " + e.getMessage());
+                                    }
+                                    try {
+                                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
+                                        metricsService.getCollector().AddGroupGetSizeMetric(bd.getCluster(), bd.getBrokerName(), topic, group, bsd.getStatsMinute().getTps());
+                                    } catch (Exception e) {
+                                        log.info("error is " + e.getMessage());
+                                    }
+                                    try {
+
+                                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
+                                        metricsService.getCollector().AddsendBackNumsMetric(bd.getCluster(), bd.getBrokerName(), topic, group, bsd.getStatsMinute().getTps());
+                                    } catch (Exception e) {
+                                        log.info("error is " + e.getMessage());
+                                    }
+                                    try {
+                                        collectLatencyMetrcisInner(topic, group, masterAddr, bd);
+                                    } catch (Exception e) {
+                                        log.info("error is " + e.getMessage());
+                                    }
+                                } catch (Exception e) {
+                                    log.info("error is " + e.getMessage());
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+
+    @Scheduled(cron = "15 0/1 * * * ?")
+    @MultiMQAdminCmdMethod(timeoutMillis = 5000)
+    public void collectBroker() {
+        if (!rmqConfigure.isEnableCollect()) {
+            return;
+        }
+        try {
+            Date date = new Date();
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
+            for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
+                String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+                if (masterAddr != null) {
+                    try {
+                        BrokerStatsData bsd = null;
+                        try {
+                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS,clusterEntry.getValue().getCluster());
+                            metricsService.getCollector().AddBrokerPutNumsMetric(clusterEntry.getValue().getCluster(), clusterEntry.getValue().getBrokerName(), bsd.getStatsMinute().getTps());
+                        }
+                        catch (Exception e) {
+                            log.info("error is " + e.getMessage());
+                        }
+                        try {
+                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
+                            metricsService.getCollector().AddBrokerGetNumsMetric(clusterEntry.getValue().getCluster(), clusterEntry.getValue().getBrokerName(), bsd.getStatsMinute().getTps());
+                        }
+                        catch (Exception e) {
+                            log.info("error is " + e.getMessage());
+                        }
+                    } catch (Exception e) {
+                        log.info("error is " + e.getMessage());
+                    }
+                }
+            }
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+    private void collectLatencyMetrcisInner(String topic,String group,String masterAddr, BrokerData bd) throws Exception {
+        long maxLagTime = 0;
+        String statsKey;
+        BrokerStatsData bsd = null;
+        ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group, topic);
+        Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet();
+        for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
+            MessageQueue q = consumeStatusEntry.getKey();
+            OffsetWrapper offset = consumeStatusEntry.getValue();
+            int queueId = q.getQueueId();
+            statsKey = String.format("%d@%s@%s", queueId, topic, group);
+            try {
+                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_LATENCY, statsKey);
+                metricsService.getCollector().AddGroupGetLatencyMetric(bd.getCluster(), bd.getBrokerName(), topic, group, String.format("%d", queueId), bsd.getStatsMinute().getTps());
+            } catch (Exception e) {
+                log.info("error is " + e.getMessage());
+            }
+            MQAdminExtImpl mqAdminImpl = (MQAdminExtImpl) mqAdminExt;
+            PullResult consumePullResult = mqAdminImpl.queryMsgByOffset(q, offset.getConsumerOffset());
+            long lagTime = 0;
+            if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
+                lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
+                if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
+                    lagTime = 0;
+                }
+            } else if (consumePullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
+                lagTime = 0;
+            } else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
+                PullResult pullResult = mqAdminImpl.queryMsgByOffset(q, consumePullResult.getMinOffset());
+                if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
+                    lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
+                }
+            } else {
+                lagTime = 0;
+            }
+            if (lagTime > maxLagTime) {
+                maxLagTime = lagTime;
+            }
+        }
+        metricsService.getCollector().AddGroupGetLatencyByStoreTimeMetric(bd.getCluster(), bd.getBrokerName(), topic, group, maxLagTime);
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java
new file mode 100644
index 0000000..a80e998
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/util/JsonUtil.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.exporter.util;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+
+@SuppressWarnings("unchecked")
+public class JsonUtil {
+
+    private static Logger logger = LoggerFactory.getLogger(JsonUtil.class);
+    private static ObjectMapper objectMapper = new ObjectMapper();
+
+    private JsonUtil() {
+    }
+
+    static {
+        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+        objectMapper.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true);
+        objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
+        objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+        objectMapper.setFilters(new SimpleFilterProvider().setFailOnUnknownId(false));
+        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
+        objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
+    }
+
+    public static void writeValue(Writer writer, Object obj) {
+        try {
+            objectMapper.writeValue(writer, obj);
+        }
+        catch (IOException e) {
+            Throwables.propagateIfPossible(e);
+        }
+    }
+
+    public static <T> String obj2String(T src) {
+        if (src == null) {
+            return null;
+        }
+
+        try {
+            return src instanceof String ? (String)src : objectMapper.writeValueAsString(src);
+        }
+        catch (Exception e) {
+            logger.error("Parse Object to String error src=" + src, e);
+            return null;
+        }
+    }
+
+    public static <T> byte[] obj2Byte(T src) {
+        if (src == null) {
+            return null;
+        }
+
+        try {
+            return src instanceof byte[] ? (byte[])src : objectMapper.writeValueAsBytes(src);
+        }
+        catch (Exception e) {
+            logger.error("Parse Object to byte[] error", e);
+            return null;
+        }
+    }
+
+    public static <T> T string2Obj(String str, Class<T> clazz) {
+        if (Strings.isNullOrEmpty(str) || clazz == null) {
+            return null;
+        }
+        str = escapesSpecialChar(str);
+        try {
+            return clazz.equals(String.class) ? (T)str : objectMapper.readValue(str, clazz);
+        }
+        catch (Exception e) {
+            logger.error("Parse String to Object error\nString: {}\nClass<T>: {}\nError: {}", str, clazz.getName(), e);
+            return null;
+        }
+    }
+
+    public static <T> T byte2Obj(byte[] bytes, Class<T> clazz) {
+        if (bytes == null || clazz == null) {
+            return null;
+        }
+        try {
+            return clazz.equals(byte[].class) ? (T)bytes : objectMapper.readValue(bytes, clazz);
+        }
+        catch (Exception e) {
+            logger.error("Parse byte[] to Object error\nbyte[]: {}\nClass<T>: {}\nError: {}", bytes, clazz.getName(), e);
+            return null;
+        }
+    }
+
+    public static <T> T string2Obj(String str, TypeReference<T> typeReference) {
+        if (Strings.isNullOrEmpty(str) || typeReference == null) {
+            return null;
+        }
+        str = escapesSpecialChar(str);
+        try {
+            return (T)(typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));
+        }
+        catch (Exception e) {
+            logger.error("Parse String to Object error\nString: {}\nTypeReference<T>: {}\nError: {}", str,
+                typeReference.getType(), e);
+            return null;
+        }
+    }
+
+    public static <T> T byte2Obj(byte[] bytes, TypeReference<T> typeReference) {
+        if (bytes == null || typeReference == null) {
+            return null;
+        }
+        try {
+            return (T)(typeReference.getType().equals(byte[].class) ? bytes : objectMapper.readValue(bytes,
+                typeReference));
+        }
+        catch (Exception e) {
+            logger.error("Parse byte[] to Object error\nbyte[]: {}\nTypeReference<T>: {}\nError: {}", bytes,
+                typeReference.getType(), e);
+            return null;
+        }
+    }
+
+    public static <T> T map2Obj(Map<String, String> map, Class<T> clazz) {
+        String str = obj2String(map);
+        return string2Obj(str, clazz);
+    }
+
+    private static String escapesSpecialChar(String str) {
+        return str.replace("\n", "\\n").replace("\r", "\\r");
+    }
+}
diff --git a/rocketmq-prometheus-exporter/src/main/resources/application.properties b/rocketmq-prometheus-exporter/src/main/resources/application.properties
new file mode 100644
index 0000000..ad1d563
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/resources/application.properties
@@ -0,0 +1,14 @@
+server.port=5557
+
+spring.application.name=rocketmq-exporter
+spring.http.encoding.charset=UTF-8
+spring.http.encoding.enabled=true
+spring.http.encoding.force=true
+logging.config=classpath:logback.xml
+#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR
+rocketmq.config.namesrvAddr=192.168.0.48:9876
+
+
+rocketmq.config.enableCollect=true
+rocketmq.config.webTelemetryPath=/metrics
+rocketmq.config.rocketmqVersion=4_3_2
\ No newline at end of file
diff --git a/rocketmq-prometheus-exporter/src/main/resources/logback.xml b/rocketmq-prometheus-exporter/src/main/resources/logback.xml
new file mode 100644
index 0000000..b5291d3
--- /dev/null
+++ b/rocketmq-prometheus-exporter/src/main/resources/logback.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder charset="UTF-8">
+			<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
+		</encoder>
+	</appender>
+
+	<appender name="FILE"
+		class="ch.qos.logback.core.rolling.RollingFileAppender">
+		<file>${user.home}/logs/exporterlogs/rocketmq-exporter.log</file>
+		<append>true</append>
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+			<fileNamePattern>${user.home}/logs/exporterlogs/rocketmq-exporter-%d{yyyy-MM-dd}.%i.log
+			</fileNamePattern>
+			<timeBasedFileNamingAndTriggeringPolicy
+				class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+				<maxFileSize>104857600</maxFileSize>
+			</timeBasedFileNamingAndTriggeringPolicy>
+			<MaxHistory>10</MaxHistory>
+		</rollingPolicy>
+		<encoder>
+			<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
+			<charset class="java.nio.charset.Charset">UTF-8</charset>
+		</encoder>
+	</appender>
+
+	<root level="INFO">
+		<appender-ref ref="STDOUT" />
+		<appender-ref ref="FILE" />
+	</root>
+
+</configuration> 
\ No newline at end of file
diff --git a/rocketmq-prometheus-exporter/style/copyright/Apache.xml b/rocketmq-prometheus-exporter/style/copyright/Apache.xml
new file mode 100644
index 0000000..e3e3dec
--- /dev/null
+++ b/rocketmq-prometheus-exporter/style/copyright/Apache.xml
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <copyright>
+        <option name="myName" value="Apache" />
+        <option name="notice" value="Licensed to the Apache Software Foundation (ASF) under one or more&#10;contributor license agreements.  See the NOTICE file distributed with&#10;this work for additional information regarding copyright ownership.&#10;The ASF licenses this file to You under the Apache License, Version 2.0&#10;(the &quot;License&quot;); you may not use this file except in compliance with&#10;the License.  You may obtain a copy of the License at&#10;&#10;    http://www.apache.org/licenses/LICENSE-2.0&#10;&#10;Unless required by applicable law or agreed to in writing, software&#10;distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;See the License for the specific language governing permissions and&#10;limitations under the License." />
+    </copyright>
+</component>
\ No newline at end of file
diff --git a/rocketmq-prometheus-exporter/style/copyright/profiles_settings.xml b/rocketmq-prometheus-exporter/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..747c7e2
--- /dev/null
+++ b/rocketmq-prometheus-exporter/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <settings default="Apache">
+        <module2copyright>
+            <element module="All" copyright="Apache"/>
+        </module2copyright>
+        <LanguageOptions name="GSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="HTML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JAVA">
+            <option name="fileTypeOverride" value="3" />
+            <option name="addBlankAfter" value="false" />
+        </LanguageOptions>
+        <LanguageOptions name="JSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSPX">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="MXML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="Properties">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="SPI">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="XML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="__TEMPLATE__">
+            <option name="separateBefore" value="true"/>
+            <option name="lenBefore" value="1"/>
+        </LanguageOptions>
+    </settings>
+</component>
\ No newline at end of file
diff --git a/rocketmq-prometheus-exporter/style/rmq_checkstyle.xml b/rocketmq-prometheus-exporter/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..2e9658f
--- /dev/null
+++ b/rocketmq-prometheus-exporter/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+        <property name="fileExtensions" value="java" />
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>
diff --git a/rocketmq-prometheus-exporter/style/rmq_codeStyle.xml b/rocketmq-prometheus-exporter/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..9db075e
--- /dev/null
+++ b/rocketmq-prometheus-exporter/style/rmq_codeStyle.xml
@@ -0,0 +1,157 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<code_scheme name="rocketmq">
+    <option name="USE_SAME_INDENTS" value="true"/>
+    <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+    <option name="OTHER_INDENT_OPTIONS">
+        <value>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+            <option name="USE_TAB_CHARACTER" value="false"/>
+            <option name="SMART_TABS" value="false"/>
+            <option name="LABEL_INDENT_SIZE" value="0"/>
+            <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+            <option name="USE_RELATIVE_INDENTS" value="false"/>
+        </value>
+    </option>
+    <option name="PREFER_LONGER_NAMES" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="true"/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+    <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+    <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+    <option name="ELSE_ON_NEW_LINE" value="true"/>
+    <option name="WHILE_ON_NEW_LINE" value="true"/>
+    <option name="CATCH_ON_NEW_LINE" value="true"/>
+    <option name="FINALLY_ON_NEW_LINE" value="true"/>
+    <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+    <option name="ALIGN_MULTILINE_FOR" value="false"/>
+    <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+    <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+    <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+    <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+    <option name="LABELED_STATEMENT_WRAP" value="1"/>
+    <option name="WRAP_COMMENTS" value="true"/>
+    <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+    <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+    <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+    <JavaCodeStyleSettings>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+        <option name="INDENT_SIZE" value="2"/>
+    </ADDITIONAL_INDENT_OPTIONS>
+    <codeStyleSettings language="Groovy">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="HOCON">
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="JAVA">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+        <option name="LABELED_STATEMENT_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="JSON">
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="Scala">
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="XML">
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>
\ No newline at end of file